2.6.0-RC1でstableになったAkka Typedを試してみる
Scalaの並行・分散処理ライブラリの Akka
には、メッセージパッシングをより型安全に行うことのできる Akka Typed
という機能があります。
先週リリースされた 2.6.0-RC1
にてAPIがstableになりました(production readyは2.5.22から)。
今日は旧来のActor( Akka Classic
)との比較や、TypedとClassicの相互運用を中心に軽く試してみたいと思います。
Classic v.s. Typed
まずはAkka ClassicとTypedの比較からです。
Akka Classic
Akka ClassicのActorはどんなメッセージでも受け取ることができます。柔軟ではありますが、静的型付けの恩恵を受けられないのもまた事実です。
数をカウントする簡単なアプリケーションで確認してみます。
カウンターはこのような感じです。カウントを操作するメッセージと、カウントの返却を要求するメッセージを受け付けます。
class Counter extends Actor { private var _count = 0 // 内部状態 override def receive: Receive = { case Increment => _count += 1 case ReadCount => sender() ! CountResponse(_count) } } object Counter { def props: Props = Props(new Counter) // カウンターが受け付けるメッセージ sealed trait Command case object Increment extends Command case object Decrement extends Command case object ReadCount extends Command // カウンターのレスポンス final case class CountResponse(value: Int) }
このカウンターを利用してみます。2回のインクリメント、1回のデクリメント後にカウントの返却を要求し、結果をprintしているだけです。
object CounterApp extends App { val system = ActorSystem("count-app") implicit val ec: ExecutionContext = system.dispatcher implicit val timeout: Timeout = Timeout(3.seconds) // カウンターのActorを生成 val counter = system.actorOf(Counter.props) counter ! Counter.Increment counter ! Counter.Increment counter ! Counter.Decrement counter.ask(Counter.ReadCount) .mapTo[CountResponse] // Future[Any]をFuture[CountResponse]に変換 .foreach(res => println(s"count: ${res.value}")) system.terminate() }
このコードはコンパイルできますし、実行もできます。
しかし正しく動きません。インクリメントを2回、デクリメントを1回行ったのにカウンターの値は 2
になります。 Decrement
のメッセージをハンドリングをうっかり忘れているためです。
カウンターが受け取るメッセージは sealed trait Command
を継承したADTになっていますが、それでも網羅性は検査できません。
また ask
した結果の Future[Any]
を Future#mapTo[T]
で変換しているところもちょっと怖い感じです。Future[CountResponse]
に変換して問題ないかを静的に検査することはできません((そのため、ask
ではなく tell
しあうようにすることが多いような気がします))。
Akka Typed
Akka Classicでの問題点は、
- Actorが受信するメッセージのハンドリングが網羅的かを検査できない
- Actorからのレスポンスメッセージをリクエスト側でキャストする必要がある
の2点でした。
Akka Typedでは上をクリアするために、 ActorRef
が受信できるメッセージの型引数をとるようになっています。これも簡単なカウンターアプリケーションで見てみましょう。
Akka TypedのAPIには2種類のスタイルがあります。
- Functional style: immutableなstateを受け取り
Behavior
を返す関数を再帰的に呼ぶことで振る舞いを表現するスタイル - Object Oriented style: 内部にmutableなstateを持つ、Akka Classicに似た記法を持つスタイル
まずは後者のOOスタイルで見ていきます。カウンターはこのような感じになります。
class Counter(context: ActorContext[Command]) extends AbstractBehavior[Command](context) { import Counter._ private var _count = 0 override def onMessage(msg: Command): Behavior[Command] = msg match { case Increment => _count += 1 this case Decrement => _count -= 1 this case ReadCount(replyTo) => replyTo ! CountResponse(_count) this } } object Counter { def apply(): Behavior[Command] = Behaviors.setup(context => new Counter(context)) sealed trait Command case object Increment extends Command case object Decrement extends Command final case class ReadCount(replyTo: ActorRef[CountResponse]) extends Command final case class CountResponse(value: Int) }
Akka Classicでは、Actorは Actor
traitで表現されていましたが、Akka Typedでは Behavior
と ActorContext
で表現されます。
Behaviorは「受け取ったメッセージにどのように反応するか」を定義するもので、 Behavior[Command]
のように型引数をとります。これはそのBehaviorが受け取ることができるメッセージを表します。
受け取るメッセージの種類は複数であることが一般的なので、上の例のようにtraitとして表現されることが多いでしょう。
Behaviorが実行されるコンテキストを表すのが ActorContext
です。OOスタイルのActorはこのActorContextが渡された AbstractBehavior
をmixinすることで表現されます。
AbstractBehaviorが実装を要求するのが onMessage
メソッドです。このメソッドはメッセージを受け取り、次のメッセージをハンドリングする新たなBehaviorを返します。関数型らしい考え方ですね。
上では、onMessageは受けとった msg
に対するパターンマッチとして実装しています。
そのためsealed traitなメッセージの網羅が漏れている場合には、下記のようにコンパイラが警告をしてくれます。コンパイルオプションで-Xfatal-warnings
を指定してエラーにすることもできますね。
[warn] It would fail on the following input: Decrement [warn] override def onMessage(msg: Command): Behavior[Command] = msg match {
Behaviorのファクトリである Behaviors
には、いくつかの便利メソッドが用意されています。
- 今のBehaviorを再利用する
Behaviors.same
- 今のBehaviorを再利用しつつ、メッセージがハンドリングされていないことを伝える
Behaviors.unhandled
- Behaviorをterminateする
Behaviors.stopped
上の例では this
を返していますが、これは意味的には Behaviors.same
と同じようです。
他にもメッセージをスタッシュする Behaviors.withStash
や unhandledに似た Behaviors.empty
, Behaviors.ignore
などがあります。
また上の例にはありませんが、Actorのライフサイクルイベントを表す Signal
を受け取りBehaviorを返す onSignal
メソッドを実装することもできます。たとえば PreRestart
でログを書く、 PostStop
でなんらかのリソースを解放するなどがよくありそうなパターンです。
このCounterを使う側は次のようになります。Akka Classic版と同じく、2回のインクリメントと1回のデクリメントの後に結果をprintしてみます。
object CounterApp extends App { val system = ActorSystem(Guardian(), "system") implicit val timeout: Timeout = 3.seconds implicit val ec: ExecutionContext = system.executionContext implicit val scheduler: Scheduler = system.scheduler val counter: Future[ActorRef[Counter.Command]] = system.ask { replyTo => SpawnProtocol.Spawn( behavior = Counter(), name = "counter", props = Props.empty, replyTo = replyTo ) } for (counterRef <- counter) { counterRef ! Counter.Increment counterRef ! Counter.Increment counterRef ! Counter.Decrement counterRef .ask[Counter.CountResponse](Counter.ReadCount) .foreach(res => println(s"count: ${res.value}")) } system.terminate() } object Guardian { def apply(): Behavior[SpawnProtocol.Command] = Behaviors.setup(_ => SpawnProtocol()) }
まず注目したいのが ActorSystem
の生成方法です。このActorSystemはAkka ClassicのActorSystemとはパッケージが異なる別物ですが、Actor階層の最上位を示すものである点は同じです。
第一引数で受け取るのは、user guardian
となるBehaviorです。
Akka Classicではuser guardianとなるActorは初期化時に内部で生成されていましたが、Typedでは自身で明示的に生成する必要があります。上の例では特にguardianとしての仕事は何もしていませんが、ちゃんとやるなら Behaviors.supervisor
などを使った異常系のハンドリングを実装する必要がありそうです。
またAkka Classicでは ActorSystem#actorOf
を通じてActorSystemの外部から子Actorを生成できましたが、Typedではそれはできません。その代わりに、子Actorを生成するための SpawnProtocol.Spawn
というメッセージが用意されています。このメッセージをaskすると生成された子Actorの ActorRef
がFutureで返却されるので、そこに対して子Actor向けのメッセージを送信する、という流れになります。
このように、Akka TypedではActorSystemの外の世界から結果を問い合わせるのには少々工夫が必要です。ドキュメントでは Request-Response with ask from outside an Actor パターンと呼ばれています。
ドキュメントには他にも Fire and Forgetや Per session child Actorなど、参考になるパターンが丁寧に解説されているので、ぜひ一読をおすすめします。
他にも目についた違いは以下のとおりです。
- Typedには
sender()
がありません。なのでレスポンスが欲しい場合は自身のActorRefをリクエストに含め、受信側がそこに対してレスポンスをtellします。 - Typedには
parent
がありません。参照したい場合はBehaviorの引数に親ActorのActorRefを受け取るようにします。 - Typedには
PoisonPill
がありません。Actorを停止させるメッセージを受け取るBehaviorを明示的に実装するようにします。 - ClassicでのSupervisorStrategyのデフォルト戦略は
Restart
でしたが、TypedではStop
です。 デフォルト戦略を利用しているActorをTypedに切り替える際には注意が必要です。
なお、Counterを Functional style
で書くと以下のようになり、classが不要になります。状態を関数の引数として引き回す感じですね。個人的にはこちらの方が好みかもしれません。
object Counter { def apply(): Behavior[Command] = counter(0) private def counter(count: Int): Behavior[Command] = Behaviors.receive { (_, msg) => msg match { case Increment => counter(count + 1) case Decrement => counter(count - 1) case ReadCount(replyTo) => replyTo ! CountResponse(count) Behaviors.same } } sealed trait Command case object Increment extends Command case object Decrement extends Command final case class ReadCount(replyTo: ActorRef[CountResponse]) extends Command final case class CountResponse(value: Int) }
Typed と Classic の相互運用
分散処理基盤など、Akkaを利用するアプリケーションは比較的大規模になる傾向がありそうです。
上で見てきたように、ActorSystemやActorRefは、Akka ClassicとTypedで型が異なります。
となると、いきなり全てのActorをTyped版に置き換えるのも難しそうです。段階的な移行の容易さが気になります。
うれしいことに、TypedとClassicの相互運用性のための機能も提供されています。
以下の例はMaster-WorkerパターンのWorkerをTypedにしたものです。
... import akka.actor.typed.scaladsl.adapter._ import akka.{actor => classic} ... object IntegratedApp extends App { val system = ActorSystem("count-app") implicit val ec: ExecutionContext = system.dispatcher implicit val timeout: Timeout = 3.seconds val counter = system.actorOf(CountMaster.props, "master") counter ! CountMaster.Increment counter ! CountMaster.Increment counter ! CountMaster.Decrement counter ! CountMaster.PrintCount system.terminate() } class CountMaster extends classic.Actor with classic.ActorLogging { import CountMaster._ import CountWorker.CountResponse implicit val timeout: Timeout = 3.seconds implicit val ec: ExecutionContext = context.system.dispatcher implicit val scheduler: Scheduler = context.system.scheduler.toTyped private val worker = context.system.spawn(CountWorker(), "count-worker") override def receive: Receive = { case Increment => worker ! CountWorker.Plus(1) case Decrement => worker ! CountWorker.Minus(1) case PrintCount => worker .ask[CountResponse](replyTo => CountWorker.ReadCount(replyTo)) .foreach(res => log.info("count: {}", res.value)) } } object CountMaster { def props: Props = Props(new CountMaster) sealed trait MasterCommand case object Increment extends MasterCommand case object Decrement extends MasterCommand case object PrintCount extends MasterCommand } class CountWorker(context: ActorContext[WorkerCommand]) extends AbstractBehavior[WorkerCommand](context) { import CountWorker._ private var _count = 0 override def onMessage(msg: WorkerCommand): Behavior[WorkerCommand] = msg match { case Plus(n) => _count = _count + n this case Minus(n) => _count = _count - n this case ReadCount(replyTo) => replyTo ! CountResponse(_count) this } } object CountWorker { def apply(): Behavior[WorkerCommand] = Behaviors.setup(context => new CountWorker(context)) sealed trait WorkerCommand final case class Plus(n: Int) extends WorkerCommand final case class Minus(n: Int) extends WorkerCommand case class ReadCount(replyTo: ActorRef[CountResponse]) extends WorkerCommand case class CountResponse(value: Int) }
肝になるのは akka.actor.typed.scaladsl.adapter._
のimportです。
このpackage objectには拡張メソッドが定義されており、ActorSystemのClassic/Typedの変換や、一方の親Actorからもう一方の子Actorの生成などが可能になります。
上の例では、Masterの context.system.spawn(CountWorker(), "count-worker")
の部分が暗黙的な変換を利用している箇所です。
akka.actor.ActorContext
はBehaviorから子Actorを生成する spawn
メソッドを持ちませんが、ClassicActorSystemOps
へのimplicit conversionによってspawnの呼び出しが可能になっています。
その他にも、CoexistingのドキュメントにはTypedからClassicな子Actorの生成や子Actorの監視などのサンプルコードがあります。
所感
Akkaアプリケーションの規模が大きくなってくると、ハンドリングするメッセージの漏れやaskした結果のcastに若干不安を感じていたので、型安全にメッセージを扱えるのは大変ありがたいです。
いままでのAkka Typedのサンプルでよく見たFunctional styleは若干の学習コストが要りそうですが、Object-oriented styleならAkka Classicに慣れた人でもすぐ容易に書き始められるのではないでしょうか。
またClassic/Typedの相互運用も考えられている点も嬉しいですね。Typedへの移行を前向きに検討できそうです。
Typedでは若干の記述が増えるものの、型安全の恩恵を考えるとお釣りが来るくらいなのではないかなと思います。
今回は触れられなかったのですが、SupervisorやAkka Streams、クラスタリングなども引き継ぎ調査していこうと思います。
完全なサンプルコードはGitHubにあります。