这个示例是我在:http://blog.csdn.net/lovehuangjiaju/article/details/51582326 看到的,是一个很不错的Ask Send-And-Receive-Future的示例,特此记一下。

关于ask :Send-And-Receive-Future可以看看我翻译的官方的介绍:https://www.ptbird.cn/akka-ask-translatation/

主要的关键在于如何构建隐式值timeout(需要引入scala.concurrent.duration._),以及必须引入的异常处理(import scala.concurrent.ExecutionContext.Implicits.global)。

示例如下:

 

import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.{ask, pipe}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
//没有上面的这个引用 会报错误
/**
  * Created by Postbird on 2016/11/28.
  */
/*
  *    消息处理:?(Send-And-Receive-Future)
  */
// 消息:个人基础信息
case class BasicInfo(id:Int,val name:String,age:Int)
//消息:个人兴趣信息
case class InterestInfo(id:Int,val interest:String)
//消息:完整的个人信息
case class Person(basicInfo:BasicInfo,interestInfo:InterestInfo)
//基础信息对应的Actor
class BasicInfoActor extends Actor {
    val log=Logging(context.system,this)
    override def receive={
        //处理发送来的ID 然后将结果发送给sender
        case id:Int=>log.info("id="+id); sender !  BasicInfo(id,"posthird",22)
        case _ =>log.info("received unknown message!")
    }
}
//兴趣爱好对应的Actor
class InterestInfoActor extends Actor {
    val log=Logging(context.system,this)
    override def receive={
        case id:Int=>log.info("id="+id); sender ! InterestInfo(id,"篮球")
        case _ =>log.info("received unknown message!")
    }
}
//Person对应的完整的Actor
class PersonActor extends Actor{
    val log=Logging(context.system,this)
    override def  receive={
        case person:Person=>log.info("Person :"+person)
        case _ =>log.info("received unknown message")
    }
}
//处理的class
class CombineActor extends Actor{
    //seconds 需要引入concurrent.duration._
    implicit val timeout = Timeout(5 seconds)
    val basicInfoActor = context.actorOf(Props[BasicInfoActor],name="basicInfoActor")
    val interestInfoActor = context.actorOf(Props[InterestInfoActor],name="interestInfoActor")
    val personActor  = context.actorOf(Props[PersonActor],name="personActor")
    override def receive={
        case id:Int =>{
            val combineResult:Future[Person]=
                for{
                //向basicInfo 发送 send-and-receive-future 消息 mapTo方法返回将返回结果映射为BasicInfo类型
                //由于 pre 的原因 我把 < - 分开写了
                basicInfo < -   ask(basicInfoActor,id).mapTo[BasicInfo]
                //向InterestInfo发送 send-and-receive-future 消息
                //三种方式
                interestInfo < - ask(interestInfoActor,id).mapTo[InterestInfo]
                interestInfo < - (interestInfoActor ask id).mapTo[InterestInfo]
                interestInfo < - (interestInfoActor ? id).mapTo[InterestInfo]
            } yield Person(basicInfo,interestInfo)
            //将Future结果发送给PersonActor
            pipe(combineResult).to(personActor)
        }
    }
}
object AkkaTest_6 extends App{
    val _system = ActorSystem("send-and-receive-Future")
    val combineActor = _system.actorOf(Props[CombineActor],name="combineActor")
    combineActor ! 12345
    Thread.sleep(5000)
    //close system
    _system.shutdown()
}