[  Ask: Send-And-Receive-Future ] 是Akka官方首页对使用Ask的send-and-receive-future的方式简单介绍和举例。

原文地址为:http://doc.akka.io/docs/akka/snapshot/scala/actors.html#Ask__Send-And-Receive-Future

为什么我会翻译一下这一部分?

最近在看scala的akka库,中文资料少的可怜,直接看英文的比较好。因为很多所谓的中文资料也就是从官方抄几个demo,然后放出来就完了,实际上能不能运行还是个问题。我在看ask这部分的时候看别人的示例都无法正常的运行,就去认真看了一下官方的介绍(仅仅是个介绍而已),顺便记录一下,用来加深记忆。

翻译谈不上,用蹩脚的英文自己去理解一下。

内容:

Akka ask消息模式相比Fire-Forget模式,它包含了actor和future两个部分(Fire-Forget看做一个ActorRef的对象的方法使用的消息发送模式),因此,ask不是作为ActorRef的一个方法给出,而是作为akka.pattern.ask 的一个成员(可以说是一个方法)给出。

官方示例:

 

import akka.pattern.{ ask, pipe }
import system.dispatcher // The ExecutionContext that will be used
final case class Result(x: Int, s: String, d: Double)
case object Request
 
implicit val timeout = Timeout(5 seconds) // needed for `?` below
 
val f: Future[Result] =
  for {
    x < - ask(actorA, Request).mapTo[Int] // call pattern directly
    s < - (actorB ask Request).mapTo[String] // call by implicit conversion
    d < - (actorC ? Request).mapTo[Double] // call by symbolic name
  } yield Result(x, s, d)
 
f pipeTo actorD // .. or ..
pipe(f) to actorD

 

上面例子演示了ask 和 pipeTo 一起基于Future的使用,因为这样的组合可能是在使用过程中最常见的这样的。需要注意的是,上面所述(例子中三个发送)都是完全非阻塞和异步的:每个ask产生Future,而x,s,d通过for...yield重新组成一个新的Future,然后通过pipeTo()对产生的Result进行聚合处理,再次发送给另外一个Actor

使用ask发送的消息和tell()发送消息是一样的,并且接收到消息的Actor必须通过sender()!回复消息,从而得到一个有结果的Future。ask消息发送模式会创建一个内部的Actor来处理消息的回复,这个内部的Actor是需要一个timeout,如果超时则进行销毁,防止资源泄露(就是为什么很多例子都有timeout的隐式值)。

Warning:

如果发生了ask消息发送后发生了异常,需要向sender发送一个失败(Failure)的消息。Actor在处理消息时无法自动回复Failure异常信息,因此需要try...catch


try {
  val result = operation()
  sender() ! result
} catch {
  case e: Exception =>
    sender() ! akka.actor.Status.Failure(e)
    throw e
}

如果actor没有正常处理消息并返回future,超过设定的时间释放资源,抛出AskTimeoutException异常。timeout的超时时间根据优先级,从下面的顺序中得到。

1、显示指定timeout


import scala.concurrent.duration._
import akka.pattern.ask
val future = myActor.ask("hello")(5 seconds)

 

2、由akka.util.Timeout隐式指定超时

Future的onComplete、onSuccess、onFailure方法(如果ajax异步调用)能够得到消息发送的状态,从而能够通过状态进行操作,避免阻塞。


import scala.concurrent.duration._
import akka.util.Timeout
import akka.pattern.ask
implicit val timeout = Timeout(5 seconds)
val future = myActor ? "hello"

 

Warning

当使用Future的回调方法(像是上面说的三个方法)时,需要当心把包含了actor的引用给释放了(使用context.stop或者其他关闭的方法),也就是在回调方法里面,不要调用actor中能访问可变状态的方法。否则会破坏actor的封装和引入同步bug和资源竞争(条件竞争),因为callback是并发的调度封装的actor。

遗憾的无法在在编译的时候检测出非访问。