Candy, Vitamin or Painkiller

He's half man and half machine rides the metal monster

2.6.0-RC1でstableになったAkka Typedを試してみる

Scalaの並行・分散処理ライブラリの Akka には、メッセージパッシングをより型安全に行うことのできる Akka Typed という機能があります。
先週リリースされた 2.6.0-RC1 にてAPIがstableになりました(production readyは2.5.22から)。

discuss.lightbend.com

今日は旧来のActor( Akka Classic )との比較や、TypedとClassicの相互運用を中心に軽く試してみたいと思います。

Classic v.s. Typed

まずはAkka ClassicとTypedの比較からです。

Akka Classic

Akka ClassicのActorはどんなメッセージでも受け取ることができます。柔軟ではありますが、静的型付けの恩恵を受けられないのもまた事実です。
数をカウントする簡単なアプリケーションで確認してみます。

カウンターはこのような感じです。カウントを操作するメッセージと、カウントの返却を要求するメッセージを受け付けます。

class Counter extends Actor {

  private var _count = 0 // 内部状態

  override def receive: Receive = {
    case Increment =>
      _count += 1

    case ReadCount =>
      sender() ! CountResponse(_count)
  }
}

object Counter {
  def props: Props = Props(new Counter)

  // カウンターが受け付けるメッセージ
  sealed trait Command
  case object Increment extends Command
  case object Decrement extends Command
  case object ReadCount extends Command

  // カウンターのレスポンス
  final case class CountResponse(value: Int)
}

このカウンターを利用してみます。2回のインクリメント、1回のデクリメント後にカウントの返却を要求し、結果をprintしているだけです。

object CounterApp extends App {
  val system = ActorSystem("count-app")
  implicit val ec: ExecutionContext = system.dispatcher
  implicit val timeout: Timeout = Timeout(3.seconds)

  // カウンターのActorを生成
  val counter = system.actorOf(Counter.props)

  counter ! Counter.Increment
  counter ! Counter.Increment
  counter ! Counter.Decrement

  counter.ask(Counter.ReadCount)
    .mapTo[CountResponse] // Future[Any]をFuture[CountResponse]に変換
    .foreach(res => println(s"count: ${res.value}"))

  system.terminate()
}

このコードはコンパイルできますし、実行もできます。
しかし正しく動きません。インクリメントを2回、デクリメントを1回行ったのにカウンターの値は 2 になります。 Decrement のメッセージをハンドリングをうっかり忘れているためです。
カウンターが受け取るメッセージは sealed trait Command を継承したADTになっていますが、それでも網羅性は検査できません。

また ask した結果の Future[Any]Future#mapTo[T] で変換しているところもちょっと怖い感じです。Future[CountResponse] に変換して問題ないかを静的に検査することはできません((そのため、ask ではなく tell しあうようにすることが多いような気がします))。

Akka Typed

Akka Classicでの問題点は、

  • Actorが受信するメッセージのハンドリングが網羅的かを検査できない
  • Actorからのレスポンスメッセージをリクエスト側でキャストする必要がある

の2点でした。

Akka Typedでは上をクリアするために、 ActorRef が受信できるメッセージの型引数をとるようになっています。これも簡単なカウンターアプリケーションで見てみましょう。

Akka TypedのAPIには2種類のスタイルがあります。

  • Functional style: immutableなstateを受け取り Behaviorを返す関数を再帰的に呼ぶことで振る舞いを表現するスタイル
  • Object Oriented style: 内部にmutableなstateを持つ、Akka Classicに似た記法を持つスタイル

まずは後者のOOスタイルで見ていきます。カウンターはこのような感じになります。

class Counter(context: ActorContext[Command]) extends AbstractBehavior[Command](context) {
  import Counter._

  private var _count = 0

  override def onMessage(msg: Command): Behavior[Command] = msg match {
    case Increment =>
      _count += 1
      this

    case Decrement =>
      _count -= 1
      this

    case ReadCount(replyTo) =>
      replyTo ! CountResponse(_count)
      this
  }
}

object Counter {
  def apply(): Behavior[Command] =
    Behaviors.setup(context => new Counter(context))

  sealed trait Command
  case object Increment extends Command
  case object Decrement extends Command
  final case class ReadCount(replyTo: ActorRef[CountResponse]) extends Command

  final case class CountResponse(value: Int)
}

Akka Classicでは、Actorは Actor traitで表現されていましたが、Akka Typedでは BehaviorActorContext で表現されます。

Behaviorは「受け取ったメッセージにどのように反応するか」を定義するもので、 Behavior[Command] のように型引数をとります。これはそのBehaviorが受け取ることができるメッセージを表します。
受け取るメッセージの種類は複数であることが一般的なので、上の例のようにtraitとして表現されることが多いでしょう。

Behaviorが実行されるコンテキストを表すのが ActorContext です。OOスタイルのActorはこのActorContextが渡された AbstractBehavior をmixinすることで表現されます。

AbstractBehaviorが実装を要求するのが onMessage メソッドです。このメソッドはメッセージを受け取り、次のメッセージをハンドリングする新たなBehaviorを返します。関数型らしい考え方ですね。

上では、onMessageは受けとった msg に対するパターンマッチとして実装しています。
そのためsealed traitなメッセージの網羅が漏れている場合には、下記のようにコンパイラが警告をしてくれます。コンパイルオプションで-Xfatal-warningsを指定してエラーにすることもできますね。

[warn] It would fail on the following input: Decrement
[warn]   override def onMessage(msg: Command): Behavior[Command] = msg match {

Behaviorのファクトリである Behaviors には、いくつかの便利メソッドが用意されています。

  • 今のBehaviorを再利用する Behaviors.same
  • 今のBehaviorを再利用しつつ、メッセージがハンドリングされていないことを伝える Behaviors.unhandled
  • Behaviorをterminateする Behaviors.stopped

上の例では this を返していますが、これは意味的には Behaviors.same と同じようです。
他にもメッセージをスタッシュする Behaviors.withStash や unhandledに似た Behaviors.empty , Behaviors.ignore などがあります。

また上の例にはありませんが、Actorのライフサイクルイベントを表す Signal を受け取りBehaviorを返す onSignal メソッドを実装することもできます。たとえば PreRestart でログを書く、 PostStop でなんらかのリソースを解放するなどがよくありそうなパターンです。

このCounterを使う側は次のようになります。Akka Classic版と同じく、2回のインクリメントと1回のデクリメントの後に結果をprintしてみます。

object CounterApp extends App {

  val system = ActorSystem(Guardian(), "system")

  implicit val timeout: Timeout = 3.seconds
  implicit val ec: ExecutionContext = system.executionContext
  implicit val scheduler: Scheduler = system.scheduler

  val counter: Future[ActorRef[Counter.Command]] =
    system.ask { replyTo =>
      SpawnProtocol.Spawn(
        behavior = Counter(),
        name = "counter",
        props = Props.empty,
        replyTo = replyTo
      )
    }

  for (counterRef <- counter) {
    counterRef ! Counter.Increment
    counterRef ! Counter.Increment
    counterRef ! Counter.Decrement
    counterRef
      .ask[Counter.CountResponse](Counter.ReadCount)
      .foreach(res => println(s"count: ${res.value}"))
  }

  system.terminate()
}

object Guardian {

  def apply(): Behavior[SpawnProtocol.Command] =
    Behaviors.setup(_ => SpawnProtocol())
}

まず注目したいのが ActorSystem の生成方法です。このActorSystemはAkka ClassicのActorSystemとはパッケージが異なる別物ですが、Actor階層の最上位を示すものである点は同じです。

第一引数で受け取るのは、user guardian となるBehaviorです。
Akka Classicではuser guardianとなるActorは初期化時に内部で生成されていましたが、Typedでは自身で明示的に生成する必要があります。上の例では特にguardianとしての仕事は何もしていませんが、ちゃんとやるなら Behaviors.supervisor などを使った異常系のハンドリングを実装する必要がありそうです。

またAkka Classicでは ActorSystem#actorOf を通じてActorSystemの外部から子Actorを生成できましたが、Typedではそれはできません。その代わりに、子Actorを生成するための SpawnProtocol.Spawn というメッセージが用意されています。このメッセージをaskすると生成された子Actorの ActorRef がFutureで返却されるので、そこに対して子Actor向けのメッセージを送信する、という流れになります。

このように、Akka TypedではActorSystemの外の世界から結果を問い合わせるのには少々工夫が必要です。ドキュメントでは Request-Response with ask from outside an Actor パターンと呼ばれています。
ドキュメントには他にも Fire and Forgetや Per session child Actorなど、参考になるパターンが丁寧に解説されているので、ぜひ一読をおすすめします。

他にも目についた違いは以下のとおりです。

  • Typedには sender() がありません。なのでレスポンスが欲しい場合は自身のActorRefをリクエストに含め、受信側がそこに対してレスポンスをtellします。
  • Typedには parent がありません。参照したい場合はBehaviorの引数に親ActorのActorRefを受け取るようにします。
  • Typedには PoisonPill がありません。Actorを停止させるメッセージを受け取るBehaviorを明示的に実装するようにします。
  • ClassicでのSupervisorStrategyのデフォルト戦略は Restart でしたが、Typedでは Stop です。 デフォルト戦略を利用しているActorをTypedに切り替える際には注意が必要です。

なお、Counterを Functional style で書くと以下のようになり、classが不要になります。状態を関数の引数として引き回す感じですね。個人的にはこちらの方が好みかもしれません。

object Counter {
  def apply(): Behavior[Command] = counter(0)

  private def counter(count: Int): Behavior[Command] =
    Behaviors.receive { (_, msg) =>
      msg match {
        case Increment =>
          counter(count + 1)
        case Decrement =>
          counter(count - 1)
        case ReadCount(replyTo) =>
          replyTo ! CountResponse(count)
          Behaviors.same
      }
    }

  sealed trait Command
  case object Increment extends Command
  case object Decrement extends Command
  final case class ReadCount(replyTo: ActorRef[CountResponse]) extends Command

  final case class CountResponse(value: Int)
}

Typed と Classic の相互運用

分散処理基盤など、Akkaを利用するアプリケーションは比較的大規模になる傾向がありそうです。
上で見てきたように、ActorSystemやActorRefは、Akka ClassicとTypedで型が異なります。
となると、いきなり全てのActorをTyped版に置き換えるのも難しそうです。段階的な移行の容易さが気になります。

うれしいことに、TypedとClassicの相互運用性のための機能も提供されています。
以下の例はMaster-WorkerパターンのWorkerをTypedにしたものです。

...
import akka.actor.typed.scaladsl.adapter._
import akka.{actor => classic}
...

object IntegratedApp extends App {
  val system = ActorSystem("count-app")
  implicit val ec: ExecutionContext = system.dispatcher
  implicit val timeout: Timeout = 3.seconds

  val counter = system.actorOf(CountMaster.props, "master")

  counter ! CountMaster.Increment
  counter ! CountMaster.Increment
  counter ! CountMaster.Decrement
  counter ! CountMaster.PrintCount

  system.terminate()
}

class CountMaster extends classic.Actor with classic.ActorLogging {
  import CountMaster._
  import CountWorker.CountResponse

  implicit val timeout: Timeout = 3.seconds
  implicit val ec: ExecutionContext = context.system.dispatcher
  implicit val scheduler: Scheduler = context.system.scheduler.toTyped

  private val worker = context.system.spawn(CountWorker(), "count-worker")

  override def receive: Receive = {
    case Increment =>
      worker ! CountWorker.Plus(1)

    case Decrement =>
      worker ! CountWorker.Minus(1)

    case PrintCount =>
      worker
        .ask[CountResponse](replyTo => CountWorker.ReadCount(replyTo))
        .foreach(res => log.info("count: {}", res.value))
  }
}

object CountMaster {
  def props: Props = Props(new CountMaster)

  sealed trait MasterCommand
  case object Increment extends MasterCommand
  case object Decrement extends MasterCommand
  case object PrintCount extends MasterCommand
}

class CountWorker(context: ActorContext[WorkerCommand]) 
  extends AbstractBehavior[WorkerCommand](context) {

  import CountWorker._

  private var _count = 0

  override def onMessage(msg: WorkerCommand): Behavior[WorkerCommand] = msg match {
    case Plus(n) =>
      _count = _count + n
      this

    case Minus(n) =>
      _count = _count - n
      this

    case ReadCount(replyTo) =>
      replyTo ! CountResponse(_count)
      this
  }
}

object CountWorker {
  def apply(): Behavior[WorkerCommand] =
    Behaviors.setup(context => new CountWorker(context))

  sealed trait WorkerCommand
  final case class Plus(n: Int) extends WorkerCommand
  final case class Minus(n: Int) extends WorkerCommand
  case class ReadCount(replyTo: ActorRef[CountResponse]) extends WorkerCommand

  case class CountResponse(value: Int)
}

肝になるのは akka.actor.typed.scaladsl.adapter._ のimportです。
このpackage objectには拡張メソッドが定義されており、ActorSystemのClassic/Typedの変換や、一方の親Actorからもう一方の子Actorの生成などが可能になります。

上の例では、Masterの context.system.spawn(CountWorker(), "count-worker") の部分が暗黙的な変換を利用している箇所です。
akka.actor.ActorContext はBehaviorから子Actorを生成する spawn メソッドを持ちませんが、ClassicActorSystemOps へのimplicit conversionによってspawnの呼び出しが可能になっています。

その他にも、CoexistingのドキュメントにはTypedからClassicな子Actorの生成や子Actorの監視などのサンプルコードがあります。

所感

Akkaアプリケーションの規模が大きくなってくると、ハンドリングするメッセージの漏れやaskした結果のcastに若干不安を感じていたので、型安全にメッセージを扱えるのは大変ありがたいです。
いままでのAkka Typedのサンプルでよく見たFunctional styleは若干の学習コストが要りそうですが、Object-oriented styleならAkka Classicに慣れた人でもすぐ容易に書き始められるのではないでしょうか。
またClassic/Typedの相互運用も考えられている点も嬉しいですね。Typedへの移行を前向きに検討できそうです。

Typedでは若干の記述が増えるものの、型安全の恩恵を考えるとお釣りが来るくらいなのではないかなと思います。

今回は触れられなかったのですが、SupervisorやAkka Streams、クラスタリングなども引き継ぎ調査していこうと思います。

完全なサンプルコードはGitHubにあります。

github.com