This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch unpersistent in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 0be5d0546db2ee1aa0637f4eca9184099f6f2242 Author: He-Pin <[email protected]> AuthorDate: Sat Nov 8 16:16:04 2025 +0800 feat: Add unpersistent versions of persistent behaviors --- .../typed/AccountExampleUnpersistentDocTest.java | 150 +++++++ .../ExternalShardAllocationCompileOnlyTest.java | 5 +- .../typed/AccountExampleUnpersistentDocSpec.scala | 102 +++++ docs/src/main/paradox/typed/persistence-testing.md | 48 ++- docs/src/main/paradox/typed/testing-sync.md | 2 +- .../testkit/internal/Unpersistent.scala | 466 +++++++++++++++++++++ .../testkit/javadsl/UnpersistentBehavior.scala | 149 +++++++ .../testkit/scaladsl/UnpersistentBehavior.scala | 131 ++++++ .../scaladsl/UnpersistentDurableStateSpec.scala | 251 +++++++++++ .../scaladsl/UnpersistentEventSourcedSpec.scala | 251 +++++++++++ 10 files changed, 1548 insertions(+), 7 deletions(-) diff --git a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java new file mode 100644 index 0000000000..66c284f946 --- /dev/null +++ b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java @@ -0,0 +1,150 @@ +package jdocs.org.apache.pekko.cluster.sharding.typed; + +import org.apache.pekko.Done; +import org.apache.pekko.pattern.StatusReply; +import org.apache.pekko.testkit.package$; +import org.scalatestplus.junit.JUnitSuite; + +import static jdocs.org.apache.pekko.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; +import static org.junit.Assert.*; + +// #test +import java.math.BigDecimal; +import org.apache.pekko.actor.testkit.typed.javadsl.BehaviorTestKit; +import org.apache.pekko.actor.testkit.typed.javadsl.ReplyInbox; +import org.apache.pekko.actor.testkit.typed.javadsl.StatusReplyInbox; +import org.apache.pekko.actor.testkit.typed.javadsl.TestInbox; +import org.apache.pekko.persistence.testkit.javadsl.UnpersistentBehavior; +import org.apache.pekko.persistence.testkit.javadsl.PersistenceEffect; +import org.apache.pekko.persistence.typed.PersistenceId; +import org.junit.Test; + +public class AccountExampleUnpersistentDocTest + // #test + extends JUnitSuite +// #test +{ + @Test + public void createWithEmptyBalance() { + UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + unpersistent = emptyAccount(); + + BehaviorTestKit<AccountEntity.Command> testkit = unpersistent.getBehaviorTestKit(); + + StatusReplyInbox<Done> ackInbox = testkit.runAskWithStatus(AccountEntity.CreateAccount::new); + + ackInbox.expectValue(Done.getInstance()); + unpersistent.getEventProbe().expectPersisted(AccountEntity.AccountCreated.INSTANCE); + + // internal state is only exposed by the behavior via responses to messages or if it happens + // to snapshot. This particular behavior never snapshots, so we query within the actor's + // protocol + assertFalse(unpersistent.getSnapshotProbe().hasEffects()); + + ReplyInbox<AccountEntity.CurrentBalance> currentBalanceInbox = + testkit.runAsk(AccountEntity.GetBalance::new); + + assertEquals(BigDecimal.ZERO, currentBalanceInbox.receiveReply().balance); + } + + @Test + public void handleDepositAndWithdraw() { + UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + unpersistent = openedAccount(); + + BehaviorTestKit<AccountEntity.Command> testkit = unpersistent.getBehaviorTestKit(); + BigDecimal currentBalance; + + testkit + .runAskWithStatus( + Done.class, replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)) + .expectValue(Done.getInstance()); + + assertEquals( + BigDecimal.valueOf(100), + unpersistent + .getEventProbe() + .expectPersistedClass(AccountEntity.Deposited.class) + .persistedObject() + .amount); + + currentBalance = + testkit + .runAsk(AccountEntity.CurrentBalance.class, AccountEntity.GetBalance::new) + .receiveReply() + .balance; + + assertEquals(BigDecimal.valueOf(100), currentBalance); + + testkit + .runAskWithStatus( + Done.class, replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo)) + .expectValue(Done.getInstance()); + + // can save the persistence effect for in-depth inspection + PersistenceEffect<AccountEntity.Withdrawn> withdrawEffect = + unpersistent.getEventProbe().expectPersistedClass(AccountEntity.Withdrawn.class); + assertEquals(BigDecimal.valueOf(10), withdrawEffect.persistedObject().amount); + assertEquals(3L, withdrawEffect.sequenceNr()); + assertTrue(withdrawEffect.tags().isEmpty()); + + currentBalance = + testkit + .runAsk(AccountEntity.CurrentBalance.class, AccountEntity.GetBalance::new) + .receiveReply() + .balance; + + assertEquals(BigDecimal.valueOf(90), currentBalance); + } + + @Test + public void rejectWithdrawOverdraft() { + UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + unpersistent = accountWithBalance(BigDecimal.valueOf(100)); + + BehaviorTestKit<AccountEntity.Command> testkit = unpersistent.getBehaviorTestKit(); + + testkit + .runAskWithStatus( + Done.class, replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo)) + .expectErrorMessage("not enough funds to withdraw 110"); + + assertFalse(unpersistent.getEventProbe().hasEffects()); + } + + // #test + private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + emptyAccount() { + return + // #unpersistent-behavior + UnpersistentBehavior.fromEventSourced( + AccountEntity.create("1", PersistenceId.of("Account", "1")), + null, // use the initial state + 0 // initial sequence number + ); + // #unpersistent-behavior + } + + private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + openedAccount() { + return + // #unpersistent-behavior-provided-state + UnpersistentBehavior.fromEventSourced( + AccountEntity.create("1", PersistenceId.of("Account", "1")), + new AccountEntity.EmptyAccount() + .openedAccount(), // duplicate the event handler for AccountCreated on an EmptyAccount + 1 // assume that CreateAccount was the first command + ); + // #unpersistent-behavior-provided-state + } + + private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + accountWithBalance(BigDecimal balance) { + return UnpersistentBehavior.fromEventSourced( + AccountEntity.create("1", PersistenceId.of("Account", "1")), + new AccountEntity.OpenedAccount(balance), + 2); + } + // #test +} +// #test diff --git a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java index fd6947841b..1d2c105ff8 100644 --- a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java +++ b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java @@ -28,6 +28,7 @@ import org.apache.pekko.cluster.sharding.typed.ShardingEnvelope; import org.apache.pekko.cluster.sharding.typed.javadsl.ClusterSharding; import org.apache.pekko.cluster.sharding.typed.javadsl.Entity; import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey; +import org.apache.pekko.util.Timeout; public class ExternalShardAllocationCompileOnlyTest { @@ -43,8 +44,8 @@ public class ExternalShardAllocationCompileOnlyTest { sharding.init( Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId())) .withAllocationStrategy( - ExternalShardAllocationStrategy.create( - system, typeKey.name(), Duration.ofSeconds(5)))); + new ExternalShardAllocationStrategy( + system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))); // #entity // #client diff --git a/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala new file mode 100644 index 0000000000..546a113615 --- /dev/null +++ b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala @@ -0,0 +1,102 @@ +package docs.org.apache.pekko.cluster.sharding.typed + +import org.apache.pekko +import pekko.Done +import pekko.pattern.StatusReply +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +// #test +import pekko.actor.testkit.typed.scaladsl.TestInbox +import pekko.persistence.testkit.scaladsl.UnpersistentBehavior +import pekko.persistence.typed.PersistenceId + +class AccountExampleUnpersistentDocSpec + extends AnyWordSpecLike + // #test + with Matchers + // #test + { + // #test + import AccountExampleWithEventHandlersInState.AccountEntity + // #test + "Account" must { + "be created with zero balance" in { + val replyToInbox = TestInbox[StatusReply[Done]]() + val getBalanceInbox = TestInbox[AccountEntity.CurrentBalance]() + + onAnEmptyAccount { (testkit, eventProbe, snapshotProbe) => + testkit.run(AccountEntity.CreateAccount(replyToInbox.ref)) + replyToInbox.expectMessage(StatusReply.Ack) + + eventProbe.expectPersisted(AccountEntity.AccountCreated) + + // internal state is only exposed by the behavior via responses to messages or if it happens + // to snapshot. This particular behavior never snapshots, so we query within the actor's + // protocol + snapshotProbe.hasEffects shouldBe false + + testkit.run(AccountEntity.GetBalance(getBalanceInbox.ref)) + + getBalanceInbox.receiveMessage().balance shouldBe 0 + } + } + + "handle Deposit and Withdraw" in { + val replyToInbox = TestInbox[StatusReply[Done]]() + val getBalanceInbox = TestInbox[AccountEntity.CurrentBalance]() + + onAnOpenedAccount { (testkit, eventProbe, _) => + testkit.run(AccountEntity.Deposit(100, replyToInbox.ref)) + + replyToInbox.expectMessage(StatusReply.Ack) + eventProbe.expectPersisted(AccountEntity.Deposited(100)) + + testkit.run(AccountEntity.Withdraw(10, replyToInbox.ref)) + + replyToInbox.expectMessage(StatusReply.Ack) + eventProbe.expectPersisted(AccountEntity.Withdrawn(10)) + + testkit.run(AccountEntity.GetBalance(getBalanceInbox.ref)) + + getBalanceInbox.receiveMessage().balance shouldBe 90 + } + } + + "reject Withdraw overdraft" in { + val replyToInbox = TestInbox[StatusReply[Done]]() + + onAnAccountWithBalance(100) { (testkit, eventProbe, _) => + testkit.run(AccountEntity.Withdraw(110, replyToInbox.ref)) + + replyToInbox.receiveMessage().isError shouldBe true + eventProbe.hasEffects shouldBe false + } + } + } + // #test + + // #unpersistent-behavior + private def onAnEmptyAccount + : UnpersistentBehavior.EventSourced[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account] = + UnpersistentBehavior.fromEventSourced(AccountEntity("1", PersistenceId("Account", "1"))) + // #unpersistent-behavior + + // #unpersistent-behavior-provided-state + private def onAnOpenedAccount + : UnpersistentBehavior.EventSourced[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account] = + UnpersistentBehavior.fromEventSourced( + AccountEntity("1", PersistenceId("Account", "1")), + Some( + AccountEntity.EmptyAccount.applyEvent(AccountEntity.AccountCreated) -> // reuse the event handler + 1L // assume that CreateAccount was the first command + )) + // #unpersistent-behavior-provided-state + + private def onAnAccountWithBalance(balance: BigDecimal) = + UnpersistentBehavior.fromEventSourced( + AccountEntity("1", PersistenceId("Account", "1")), + Some(AccountEntity.OpenedAccount(balance) -> 2L)) + // #test +} +// #test diff --git a/docs/src/main/paradox/typed/persistence-testing.md b/docs/src/main/paradox/typed/persistence-testing.md index 46d828c153..f73c9b67bc 100644 --- a/docs/src/main/paradox/typed/persistence-testing.md +++ b/docs/src/main/paradox/typed/persistence-testing.md @@ -19,9 +19,49 @@ To use Pekko Persistence TestKit, add the module to your project: @@project-info{ projectId="persistence-testkit" } -## Unit testing +## Unit testing with the BehaviorTestKit -**Note!** The `EventSourcedBehaviorTestKit` is a new feature, api may have changes breaking source compatibility in future versions. +**Note!** The `UnpersistentBehavior` is a new feature: the API may have changes breaking source compatibility in future versions. + +Unit testing of `EventSourcedBehavior` can be performed by converting it into an @apidoc[UnpersistentBehavior]. Instead of +persisting events and snapshots, the `UnpersistentBehavior` exposes @apidoc[PersistenceProbe]s for events and snapshots which +can be asserted on. + +Scala +: @@snip [AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala) { #unpersistent-behavior } + +Java +: @@snip [AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java) { #unpersistent-behavior } + +The `UnpersistentBehavior` can be initialized with arbitrary states: + +Scala +: @@snip [AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala) { #unpersistent-behavior-provided-state } + +Java +: @@snip [AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java) { #unpersistent-behavior-provided-state } + +The `UnpersistentBehavior` is especially well-suited to the synchronous @ref:[`BehaviorTestKit`](testing-sync.md#synchronous-behavior-testing): +the `UnpersistentBehavior` can directly construct a `BehaviorTestKit` wrapping the behavior. When commands are run by `BehaviorTestKit`, +they are processed in the calling thread (viz. the test suite), so when the run returns, the suite can be sure that the message has been +fully processed. The internal state of the `EventSourcedBehavior` is not exposed to the suite except to the extent that it affects how +the behavior responds to commands or the events it persists (in addition, any snapshots made by the behavior are available through a +`PersistenceProbe`). + +A full test for the `AccountEntity`, which is shown in the @ref:[Persistence Style Guide](persistence-style.md) might look like: + +Scala +: @@snip [AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala) { #test } + +Java +: @@snip [AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java) { #test } + +`UnpersistentBehavior` does not require any configuration. It therefore does not verify the serialization of commands, events, or state. +If using this style, it is advised to independently test serialization for those classes. + +## Unit testing with the the ActorTestKit and EventSourcedBehaviorTestKit + +**Note!** The `EventSourcedBehaviorTestKit` is a new feature: the API may have changes breaking source compatibility in future versions. Unit testing of `EventSourcedBehavior` can be done with the @apidoc[EventSourcedBehaviorTestKit]. It supports running one command at a time and you can assert that the synchronously returned result is as expected. The result contains the @@ -35,7 +75,7 @@ Scala : @@snip [AccountExampleDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocSpec.scala) { #testkit } Java -: @@snip [AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java) { #testkit } +: @@snip [AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java) { #testkit } A full test for the `AccountEntity`, which is shown in the @ref:[Persistence Style Guide](persistence-style.md), may look like this: @@ -43,7 +83,7 @@ Scala : @@snip [AccountExampleDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocSpec.scala) { #test } Java -: @@snip [AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java) { #test } +: @@snip [AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java) { #test } Serialization of commands, events and state are verified automatically. The serialization checks can be customized with the `SerializationSettings` when creating the `EventSourcedBehaviorTestKit`. By default, diff --git a/docs/src/main/paradox/typed/testing-sync.md b/docs/src/main/paradox/typed/testing-sync.md index f496bb76d9..dd8eae0c0c 100644 --- a/docs/src/main/paradox/typed/testing-sync.md +++ b/docs/src/main/paradox/typed/testing-sync.md @@ -12,7 +12,7 @@ limitations: * Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous task and you rely on a callback to complete before observing the effect you want to test. * Usage of scheduler is not supported. -* `EventSourcedBehavior` can't be tested. +* `EventSourcedBehavior` can't be fully tested, but it is possible to @ref:[test the core functionality](persistence-testing.md#unit-testing-with-the-behaviortestkit) * Interactions with other actors must be stubbed. * Blackbox testing style. * Supervision is not supported. diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala new file mode 100644 index 0000000000..de9077844a --- /dev/null +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala @@ -0,0 +1,466 @@ +package org.apache.pekko.persistence.testkit.internal + +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.collection.mutable.ListBuffer +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag + +import org.apache.pekko +import pekko.actor.typed.Behavior +import pekko.actor.typed.internal.BehaviorImpl.DeferredBehavior +import pekko.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors } +import pekko.annotation.InternalApi +import pekko.persistence.testkit.{ javadsl, scaladsl } +import pekko.persistence.typed.internal.EventSourcedBehaviorImpl +import pekko.persistence.typed.internal.Running.WithSeqNrAccessible +import pekko.persistence.typed.state.internal.DurableStateBehaviorImpl +import pekko.persistence.typed.state.internal.Running.WithRevisionAccessible +import pekko.util.ConstantFun.{ scalaAnyToUnit => doNothing } + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] object Unpersistent { + + def eventSourced[Command, Event, State](behavior: Behavior[Command], fromStateAndSequenceNr: Option[(State, Long)])( + onEvent: (Event, Long, Set[String]) => Unit)(onSnapshot: (State, Long) => Unit): Behavior[Command] = { + @tailrec + def findEventSourcedBehavior( + b: Behavior[Command], + context: ActorContext[Command]): Option[EventSourcedBehaviorImpl[Command, Event, State]] = { + b match { + case es: EventSourcedBehaviorImpl[Command, _, _] => + Some(es.asInstanceOf[EventSourcedBehaviorImpl[Command, Event, State]]) + + case deferred: DeferredBehavior[Command] => + findEventSourcedBehavior(deferred(context), context) + + case _ => None + } + } + + Behaviors.setup[Command] { context => + findEventSourcedBehavior(behavior, context).fold { + throw new AssertionError("Did not find the expected EventSourcedBehavior") + } { esBehavior => + val (initialState, initialSequenceNr) = fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L) + new WrappedEventSourcedBehavior(context, esBehavior, initialState, initialSequenceNr, onEvent, onSnapshot) + } + } + } + + def durableState[Command, State](behavior: Behavior[Command], fromState: Option[State])( + onPersist: (State, Long, String) => Unit): Behavior[Command] = { + + @tailrec + def findDurableStateBehavior( + b: Behavior[Command], + context: ActorContext[Command]): Option[DurableStateBehaviorImpl[Command, State]] = + b match { + case ds: DurableStateBehaviorImpl[Command, _] => + Some(ds.asInstanceOf[DurableStateBehaviorImpl[Command, State]]) + + case deferred: DeferredBehavior[Command] => findDurableStateBehavior(deferred(context), context) + case _ => None + } + + Behaviors.setup[Command] { context => + findDurableStateBehavior(behavior, context).fold { + throw new AssertionError("Did not find the expected DurableStateBehavior") + } { dsBehavior => + val initialState = fromState.getOrElse(dsBehavior.emptyState) + new WrappedDurableStateBehavior(context, dsBehavior, initialState, onPersist) + } + } + } + + private class WrappedEventSourcedBehavior[Command, Event, State]( + context: ActorContext[Command], + esBehavior: EventSourcedBehaviorImpl[Command, Event, State], + initialState: State, + initialSequenceNr: Long, + onEvent: (Event, Long, Set[String]) => Unit, + onSnapshot: (State, Long) => Unit) + extends AbstractBehavior[Command](context) + with WithSeqNrAccessible { + import pekko.persistence.typed.{ EventSourcedSignal, RecoveryCompleted, SnapshotCompleted, SnapshotMetadata } + import pekko.persistence.typed.internal._ + + override def currentSequenceNumber: Long = sequenceNr + + private def commandHandler = esBehavior.commandHandler + private def eventHandler = esBehavior.eventHandler + private def tagger = esBehavior.tagger + private def snapshotWhen = esBehavior.snapshotWhen + private def retention = esBehavior.retention + private def signalHandler = esBehavior.signalHandler + + private var sequenceNr: Long = initialSequenceNr + private var state: State = initialState + private val stashedCommands = ListBuffer.empty[Command] + + private def snapshotMetadata() = + SnapshotMetadata(esBehavior.persistenceId.toString, sequenceNr, System.currentTimeMillis()) + private def sendSignal(signal: EventSourcedSignal): Unit = + signalHandler.applyOrElse(state -> signal, doNothing) + + sendSignal(RecoveryCompleted) + + override def onMessage(cmd: Command): Behavior[Command] = { + var shouldSnapshot = false + var shouldUnstash = false + var shouldStop = false + + def snapshotRequested(evt: Event): Boolean = { + val snapshotFromRetention = retention match { + case DisabledRetentionCriteria => false + case s: SnapshotCountRetentionCriteriaImpl => s.snapshotWhen(sequenceNr) + case unexpected => throw new IllegalStateException(s"Unexpected retention criteria: $unexpected") + } + + snapshotFromRetention || snapshotWhen(state, evt, sequenceNr) + } + + def persistEvent(evt: Event): Unit = { + sequenceNr += 1 + onEvent(evt, sequenceNr, tagger(evt)) + state = eventHandler(state, evt) + shouldSnapshot = shouldSnapshot || snapshotRequested(evt) + } + + @tailrec + def applyEffects(curEffect: EffectImpl[Event, State], sideEffects: immutable.Seq[SideEffect[State]]): Unit = + curEffect match { + case CompositeEffect(eff: EffectImpl[Event, State], se) => + applyEffects(eff, se ++ sideEffects) + + case Persist(event) => + persistEvent(event) + sideEffect(sideEffects) + + case PersistAll(events) => + events.foreach(persistEvent) + sideEffect(sideEffects) + + // From outside of the behavior, these are equivalent: no state update + case _: PersistNothing.type | _: Unhandled.type => + sideEffect(sideEffects) + + case _: Stash.type => + stashedCommands.append(cmd) + sideEffect(sideEffects) + + case _ => + context.log.error("Unexpected effect {}, stopping", curEffect) + Behaviors.stopped + } + + def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit = + sideEffects.iterator.foreach { effect => + effect match { + case _: Stop.type => shouldStop = true + case _: UnstashAll.type => shouldUnstash = true + case cb: Callback[_] => cb.sideEffect(state) + } + } + + applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[Event, State]], Nil) + + if (shouldSnapshot) { + onSnapshot(state, sequenceNr) + sendSignal(SnapshotCompleted(snapshotMetadata())) + } + + if (shouldStop) Behaviors.stopped + else if (shouldUnstash && stashedCommands.nonEmpty) { + val numStashed = stashedCommands.length + val thisWrappedBehavior = this + + Behaviors.setup { _ => + Behaviors.withStash(numStashed) { stash => + stashedCommands.foreach { sc => + stash.stash(sc) + () + } + + stashedCommands.remove(0, numStashed) + stash.unstashAll(thisWrappedBehavior) + } + } + } else this + } + } + + private class WrappedDurableStateBehavior[Command, State]( + context: ActorContext[Command], + dsBehavior: DurableStateBehaviorImpl[Command, State], + initialState: State, + onPersist: (State, Long, String) => Unit) + extends AbstractBehavior[Command](context) + with WithRevisionAccessible { + + import pekko.persistence.typed.state.{ DurableStateSignal, RecoveryCompleted } + import pekko.persistence.typed.state.internal._ + + override def currentRevision: Long = sequenceNr + + private def commandHandler = dsBehavior.commandHandler + private def signalHandler = dsBehavior.signalHandler + private val tag = dsBehavior.tag + + private var sequenceNr: Long = 0 + private var state: State = initialState + private val stashedCommands = ListBuffer.empty[Command] + + private def sendSignal(signal: DurableStateSignal): Unit = + signalHandler.applyOrElse(state -> signal, doNothing) + + sendSignal(RecoveryCompleted) + + override def onMessage(cmd: Command): Behavior[Command] = { + var shouldUnstash = false + var shouldStop = false + + def persistState(st: State): Unit = { + sequenceNr += 1 + onPersist(st, sequenceNr, tag) + state = st + } + + @tailrec + def applyEffects(curEffect: EffectImpl[State], sideEffects: immutable.Seq[SideEffect[State]]): Unit = + curEffect match { + case CompositeEffect(eff: EffectImpl[_], se) => + applyEffects(eff.asInstanceOf[EffectImpl[State]], se ++ sideEffects) + + case Persist(st) => + persistState(st) + sideEffect(sideEffects) + + case _: PersistNothing.type | _: Unhandled.type => + sideEffect(sideEffects) + + case _: Stash.type => + stashedCommands.append(cmd) + sideEffect(sideEffects) + + case _ => + context.log.error("Unexpected effect, stopping") + Behaviors.stopped + } + + def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit = + sideEffects.iterator.foreach { effect => + effect match { + case _: Stop.type => shouldStop = true + case _: UnstashAll.type => shouldUnstash = true + case cb: Callback[_] => cb.sideEffect(state) + } + } + + applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[State]], Nil) + + if (shouldStop) Behaviors.stopped + else if (shouldUnstash && stashedCommands.nonEmpty) { + val numStashed = stashedCommands.length + val thisWrappedBehavior = this + + Behaviors.setup { _ => + Behaviors.withStash(numStashed) { stash => + stashedCommands.foreach { sc => + stash.stash(sc) + () // explicit discard + } + stashedCommands.remove(0, numStashed) + stash.unstashAll(thisWrappedBehavior) + } + } + } else this + } + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] class PersistenceProbeImpl[T] { + type Element = (T, Long, Set[String]) + + val queue = new ConcurrentLinkedQueue[Element]() + + def persist(elem: Element): Unit = { queue.offer(elem); () } + + def rawExtract(): Element = + queue.poll() match { + case null => throw new AssertionError("No persistence effects in probe") + case elem => elem + } + + def asScala: scaladsl.PersistenceProbe[T] = + new scaladsl.PersistenceProbe[T] { + import scaladsl.{ PersistenceEffect, PersistenceProbe } + + def drain(): Seq[PersistenceEffect[T]] = { + @annotation.tailrec + def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]] = { + val elem = queue.poll() + if (elem == null) acc else iter(persistenceEffect(elem) :: acc) + } + + iter(Nil).reverse + } + + def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract()) + + def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S] = + rawExtract() match { + case (obj: S, sequenceNr, tags) => PersistenceEffect(obj, sequenceNr, tags) + case (extracted, _, _) => + throw new AssertionError( + s"Expected object of type [${implicitly[ClassTag[S]].runtimeClass.getName}] to be persisted, " + + s"but actual was of type [${extracted.getClass.getName}]") + } + + def hasEffects: Boolean = !queue.isEmpty + + def expectPersisted(obj: T): PersistenceProbe[T] = + rawExtract() match { + case (persistedObj, _, _) if obj == persistedObj => this + case (persistedObj, _, _) => + throw new AssertionError(s"Expected object [$obj] to be persisted, but actual was [$persistedObj]") + } + + def expectPersisted(obj: T, tag: String): PersistenceProbe[T] = + rawExtract() match { + case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag)) => this + + case (persistedObj, _, tags) if obj == persistedObj => + throw new AssertionError( + s"Expected persistence with tag [$tag], but actual tags were [${tags.mkString(",")}]") + + case (persistedObj, _, tags) if tags(tag) => + throw new AssertionError(s"Expected object [$obj] to be persisted, but actual was [$persistedObj]") + + case (persistedObj, _, tags) => + throw new AssertionError( + s"Expected object [$obj] to be persisted with tag [$tag], " + + s"but actual object was [$persistedObj] with tags [${tags.mkString(",")}]") + } + + def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T] = + rawExtract() match { + case (persistedObj, _, persistedTags) if (obj == persistedObj) && (tags == persistedTags) => this + case (persistedObj, _, persistedTags) if obj == persistedObj => + val unexpected = persistedTags.diff(tags) + val notPersistedWith = tags.diff(persistedTags) + + throw new AssertionError( + s"Expected persistence with [${tags.mkString(",")}], " + + s"but saw unexpected actual tags [${unexpected.mkString(",")}] and " + + s"did not see actual tags [${notPersistedWith.mkString(",")}]") + + case (persistedObj, _, persistedTags) if tags == persistedTags => + throw new AssertionError(s"Expected object [$obj] to be persisted, but actual was [$persistedObj}]") + + case (persistedObj, _, persistedTags) => + throw new AssertionError( + s"Expected object [$obj] to be persisted with tags [${tags.mkString(",")}], " + + s"but actual object was [$persistedObj] with tags [${persistedTags.mkString(",")}]") + } + + private def persistenceEffect(elem: Element): PersistenceEffect[T] = + PersistenceEffect(elem._1, elem._2, elem._3) + } + + def asJava: javadsl.PersistenceProbe[T] = + new javadsl.PersistenceProbe[T] { + import java.util.{ List => JList, Set => JSet } + + import javadsl.{ PersistenceEffect, PersistenceProbe } + + def drain(): JList[PersistenceEffect[T]] = { + @annotation.tailrec + def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]] = { + val elem = queue.poll() + if (elem == null) acc else iter(persistenceEffect(elem) :: acc) + } + iter(Nil).reverse.asJava + } + + def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract()) + + def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S] = + rawExtract() match { + case (obj, sequenceNr, tags) if clazz.isInstance(obj) => + PersistenceEffect(clazz.cast(obj), sequenceNr, tags.asJava) + + case (extracted, _, _) => + throw new AssertionError( + s"Expected object of type [${clazz.getName}] to be persisted, " + + s"but actual was of type [${extracted.getClass.getName}]") + } + + def hasEffects: Boolean = !queue.isEmpty + + def expectPersisted(obj: T): PersistenceProbe[T] = + rawExtract() match { + case (persistedObj, _, _) if obj == persistedObj => this + case (persistedObj, _, _) => + throw new AssertionError(s"Expected object [$obj] to be persisted, but actual was [$persistedObj]") + } + + def expectPersisted(obj: T, tag: String): PersistenceProbe[T] = + rawExtract() match { + case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag)) => this + + case (persistedObj, _, tags) if obj == persistedObj => + throw new AssertionError( + s"Expected persistence with tag [$tag], but actual tags were [${tags.mkString(",")}]") + + case (persistedObj, _, tags) if tags(tag) => + throw new AssertionError(s"Expected object [$obj] to be persisted, but actual was [$persistedObj]") + + case (persistedObj, _, tags) => + throw new AssertionError( + s"Expected object [$obj] to be persisted with tag [$tag], " + + s"but actual object was [$persistedObj] with tags [${tags.mkString(",")}]") + } + + def expectPersisted(obj: T, tags: JSet[String]): PersistenceProbe[T] = { + val sTags = tags.asScala + + // Not sure if a Java Set after asScala-ing will compare equal to a Scala Set... + def sameTags(persistedTags: Set[String]): Boolean = + sTags.forall(persistedTags) && persistedTags.forall(sTags) + + rawExtract() match { + case (persistedObj, _, persistedTags) if (obj == persistedObj) && sameTags(persistedTags) => this + + case (persistedObj, _, persistedTags) if obj == persistedObj => + val unexpected = persistedTags.diff(sTags) + val notPersistedWith = sTags.diff(persistedTags) + + throw new AssertionError( + s"Expected persistence with [${sTags.mkString(",")}], " + + s"but saw unexpected actual tags [${unexpected.mkString(",")}] and " + + s"did not see actual tags [${notPersistedWith.mkString(",")}]") + + case (persistedObj, _, persistedTags) if sameTags(persistedTags) => + throw new AssertionError(s"Expected object [$obj] to be persisted, but actual was [$persistedObj}]") + + case (persistedObj, _, persistedTags) => + throw new AssertionError( + s"Expected object [$obj] to be persisted with tags [${sTags.mkString(",")}], " + + s"but actual object was [$persistedObj] with tags [${persistedTags.mkString(",")}]") + } + } + + private def persistenceEffect(element: Element): PersistenceEffect[T] = + PersistenceEffect(element._1, element._2, element._3.asJava) + } +} diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala new file mode 100644 index 0000000000..30a87ffd41 --- /dev/null +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala @@ -0,0 +1,149 @@ +package org.apache.pekko.persistence.testkit.javadsl + +import java.util.{ List, Set } + +import scala.collection.immutable.{ Set => ScalaSet } + +import org.apache.pekko +import pekko.actor.testkit.typed.javadsl.BehaviorTestKit +import pekko.actor.typed.Behavior +import pekko.annotation.DoNotInherit +import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent } + +/** + * Factory methods to create UnpersistentBehavior instances for testing. + * + * @since 1.3.0 + */ +object UnpersistentBehavior { + + /** + * Given an EventSourcedBehavior, produce a non-persistent Behavior which synchronously publishes events and snapshots + * for inspection. State is updated as in the EventSourcedBehavior, and side effects are performed synchronously. The + * resulting Behavior is, contingent on the command handling, event handling, and side effects being compatible with the + * BehaviorTestKit, testable with the BehaviorTestKit. + * + * The returned Behavior does not intrinsically depend on configuration: it therefore does not serialize and + * assumes an unbounded stash for commands. + * + * @param behavior a (possibly wrapped) EventSourcedBehavior to serve as the basis for the unpersistent behavior + * @param initialState start the unpersistent behavior with this state; if null, behavior's initialState will be used + * @param initialSequenceNr start the unpersistent behavior with this sequence number; only applies if initialState is non-null + * @return an UnpersistentBehavior based on an EventSourcedBehavior + */ + def fromEventSourced[Command, Event, State]( + behavior: Behavior[Command], + initialState: State, + initialSequenceNr: Long): UnpersistentBehavior[Command, Event, State] = { + require(initialSequenceNr >= 0, "initialSequenceNr must be at least zero") + + val initialStateAndSequenceNr = Option(initialState).map(_ -> initialSequenceNr) + val eventProbe = new PersistenceProbeImpl[Event] + val snapshotProbe = new PersistenceProbeImpl[State] + + val b = + Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) { + (event: Event, sequenceNr: Long, tags: ScalaSet[String]) => + eventProbe.persist((event, sequenceNr, tags)) + } { (snapshot, sequenceNr) => + snapshotProbe.persist((snapshot, sequenceNr, ScalaSet.empty)) + } + + new UnpersistentBehavior(b, eventProbe.asJava, snapshotProbe.asJava) + } + + def fromEventSourced[Command, Event, State]( + behavior: Behavior[Command]): UnpersistentBehavior[Command, Event, State] = + fromEventSourced(behavior, null.asInstanceOf[State], 0) + + def fromDurableState[Command, State]( + behavior: Behavior[Command], + initialState: State): UnpersistentBehavior[Command, Void, State] = { + val probe = new PersistenceProbeImpl[State] + val b = + Unpersistent.durableState(behavior, Option(initialState)) { (state, version, tag) => + probe.persist((state, version, if (tag == "") ScalaSet.empty else ScalaSet(tag))) + } + + new UnpersistentBehavior(b, noEventProbe, probe.asJava) + } + + def fromDurableState[Command, State](behavior: Behavior[Command]): UnpersistentBehavior[Command, Void, State] = + fromDurableState(behavior, null.asInstanceOf[State]) + + private val noEventProbe: PersistenceProbe[Void] = + new PersistenceProbe[Void] { + def drain(): List[PersistenceEffect[Void]] = + // could return an empty list, but the intent is that any use of this probe should fail the test + boom() + + def extract(): PersistenceEffect[Void] = boom() + def expectPersistedClass[S <: Void](clazz: Class[S]): PersistenceEffect[S] = boom() + def hasEffects: Boolean = boom() + def expectPersisted(obj: Void): PersistenceProbe[Void] = boom() + def expectPersisted(obj: Void, tag: String): PersistenceProbe[Void] = boom() + def expectPersisted(obj: Void, tags: Set[String]): PersistenceProbe[Void] = boom() + + private def boom() = throw new AssertionError("No events were persisted") + } +} + +final class UnpersistentBehavior[Command, Event, State] private ( + behavior: Behavior[Command], + eventProbe: PersistenceProbe[Event], + stateProbe: PersistenceProbe[State]) { + def getBehavior(): Behavior[Command] = behavior + def getBehaviorTestKit(): BehaviorTestKit[Command] = btk + + /** Note: durable state behaviors will not publish events to this probe */ + def getEventProbe(): PersistenceProbe[Event] = eventProbe + + def getStateProbe(): PersistenceProbe[State] = stateProbe + def getSnapshotProbe(): PersistenceProbe[State] = stateProbe + + private lazy val btk = BehaviorTestKit.create(behavior) +} + +final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long, tags: Set[String]) + +/** + * Not for user extension + */ +@DoNotInherit +trait PersistenceProbe[T] { + + /** Collect all persistence effects from the probe and empty the probe */ + def drain(): List[PersistenceEffect[T]] + + /** Get and remove the oldest persistence effect from the probe */ + def extract(): PersistenceEffect[T] + + /** + * Get and remove the oldest persistence effect from the probe, failing if the + * persisted object is not of the requested type + */ + def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S] + + /** Are there any persistence effects */ + def hasEffects: Boolean + + /** + * Assert that the given object was persisted in the oldest persistence effect and + * remove that persistence effect + */ + def expectPersisted(obj: T): PersistenceProbe[T] + + /** + * Assert that the given object was persisted with the given tag in the oldest persistence + * effect and remove that persistence effect. If the persistence effect has multiple tags, + * only one of them has to match in order for the assertion to succeed. + */ + def expectPersisted(obj: T, tag: String): PersistenceProbe[T] + + /** + * Assert that the given object was persisted with the given tag in the oldest persistence + * effect and remove that persistence effect. If the persistence effect has tags which are + * not given, the assertion fails. + */ + def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T] +} diff --git a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala new file mode 100644 index 0000000000..ee7d2c5876 --- /dev/null +++ b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala @@ -0,0 +1,131 @@ +package org.apache.pekko.persistence.testkit.scaladsl + +import scala.reflect.ClassTag + +import org.apache.pekko +import pekko.actor.testkit.typed.scaladsl.BehaviorTestKit +import pekko.actor.typed.Behavior +import pekko.annotation.DoNotInherit +import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent } + +sealed trait UnpersistentBehavior[Command, State] { + val behavior: Behavior[Command] + lazy val behaviorTestKit = BehaviorTestKit(behavior) + + def stateProbe: PersistenceProbe[State] +} + +/** + * Factory methods to create UnpersistentBehavior instances for testing. + * + * @since 1.3.0 + */ +object UnpersistentBehavior { + + /** + * Given an EventSourcedBehavior, produce a non-persistent Behavior which synchronously publishes events and snapshots + * for inspection. State is updated as in the EventSourcedBehavior, and side effects are performed synchronously. The + * resulting Behavior is, contingent on the command handling, event handling, and side effects being compatible with the + * BehaviorTestKit, testable with the BehaviorTestKit. + * + * The returned Behavior does not intrinsically depend on configuration: it therefore does not serialize and assumes an + * unbounded stash for commands. + */ + def fromEventSourced[Command, Event, State]( + behavior: Behavior[Command], + initialStateAndSequenceNr: Option[(State, Long)] = None): EventSourced[Command, Event, State] = { + val eventProbe = new PersistenceProbeImpl[Event] + val snapshotProbe = new PersistenceProbeImpl[State] + val resultingBehavior = + Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) { + (event: Event, sequenceNr: Long, tags: Set[String]) => + eventProbe.persist((event, sequenceNr, tags)) + } { (snapshot, sequenceNr) => + snapshotProbe.persist((snapshot, sequenceNr, Set.empty)) + } + + EventSourced(resultingBehavior, eventProbe.asScala, snapshotProbe.asScala) + } + + def fromEventSourced[Command, Event, State]( + behavior: Behavior[Command], + initialState: State): EventSourced[Command, Event, State] = + fromEventSourced(behavior, Some(initialState -> 0L)) + + def fromDurableState[Command, State]( + behavior: Behavior[Command], + initialState: Option[State] = None): DurableState[Command, State] = { + val probe = new PersistenceProbeImpl[State] + + val resultingBehavior = + Unpersistent.durableState(behavior, initialState) { (state, version, tag) => + probe.persist((state, version, if (tag.isEmpty) Set.empty else Set(tag))) + } + + DurableState(resultingBehavior, probe.asScala) + } + + final case class EventSourced[Command, Event, State]( + override val behavior: Behavior[Command], + val eventProbe: PersistenceProbe[Event], + override val stateProbe: PersistenceProbe[State]) + extends UnpersistentBehavior[Command, State] { + def apply(f: (BehaviorTestKit[Command], PersistenceProbe[Event], PersistenceProbe[State]) => Unit): Unit = + f(behaviorTestKit, eventProbe, stateProbe) + + def snapshotProbe: PersistenceProbe[State] = stateProbe + } + + final case class DurableState[Command, State]( + override val behavior: Behavior[Command], + override val stateProbe: PersistenceProbe[State]) + extends UnpersistentBehavior[Command, State] { + def apply(f: (BehaviorTestKit[Command], PersistenceProbe[State]) => Unit): Unit = + f(behaviorTestKit, stateProbe) + } +} + +final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long, tags: Set[String]) + +/** + * Not for user extension + */ +@DoNotInherit +trait PersistenceProbe[T] { + + /** Collect all persistence effects from the probe and empty the probe */ + def drain(): Seq[PersistenceEffect[T]] + + /** Get and remove the oldest persistence effect from the probe */ + def extract(): PersistenceEffect[T] + + /** + * Get and remove the oldest persistence effect from the probe, failing if the + * persisted object is not of the requested type + */ + def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S] + + /** Are there any persistence effects? */ + def hasEffects: Boolean + + /** + * Assert that the given object was persisted in the oldest persistence effect and + * remove that persistence effect + */ + def expectPersisted(obj: T): PersistenceProbe[T] + + /** + * Assert that the given object was persisted with the given tag in the oldest + * persistence effect and remove that persistence effect. If the persistence + * effect has multiple tags, only one of them has to match in order for the + * assertion to succeed. + */ + def expectPersisted(obj: T, tag: String): PersistenceProbe[T] + + /** + * Assert that the given object was persisted with the given tags in the oldest + * persistence effect and remove that persistence effect. If the persistence + * effect has tags which are not given, the assertion fails. + */ + def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T] +} diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala new file mode 100644 index 0000000000..d3d3737373 --- /dev/null +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala @@ -0,0 +1,251 @@ +package org.apache.pekko.persistence.testkit.scaladsl + +import org.apache.pekko +import pekko.Done +import pekko.actor.typed.{ Behavior, RecipientRef } +import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors } +import pekko.persistence.typed.PersistenceId +import pekko.persistence.typed.state.RecoveryCompleted +import pekko.persistence.typed.state.scaladsl._ + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +object UnpersistentDurableStateSpec { + object BehaviorUnderTest { + sealed trait Command + + case class Add(n: Int, replyTo: RecipientRef[Done]) extends Command + case class AddIfLessThan(toAdd: Int, ifLessThan: Int, replyTo: RecipientRef[Boolean]) extends Command + case class AddWhenAtLeast(toAdd: Int, whenAtLeast: Int, replyTo: RecipientRef[Done]) extends Command + case class NotifyIfAtLeast(n: Int, notifyTo: RecipientRef[Done], replyTo: RecipientRef[Boolean]) extends Command + case class GetRevisionNumber(replyTo: RecipientRef[Long]) extends Command + + case class State(count: Int, notifyAfter: Map[Int, RecipientRef[Done]], nextNotifyAt: Int) { + def processAdd(n: Int): State = { + val nextCount = count + n + + if (nextCount < nextNotifyAt) copy(count = nextCount) + else { + import scala.collection.mutable + + val (nextNNA, nextNotifyAfter) = { + var lowestNotifyAt = Int.MaxValue + val inProgress = mutable.Map.empty[Int, RecipientRef[Done]] + + notifyAfter.keysIterator.foreach { at => + if (at > nextCount) { + lowestNotifyAt = lowestNotifyAt.min(at) + inProgress += (at -> notifyAfter(at)) + } + } + + lowestNotifyAt -> inProgress.toMap + } + + copy(count = nextCount, notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA) + } + } + + def addObserver(at: Int, notifyTo: RecipientRef[Done]): State = { + val nextNNA = nextNotifyAt.min(at) + val nextNotifyAfter = notifyAfter.updated(at, notifyTo) + + copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA) + } + } + + def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command] = + Behaviors.setup { context => + context.setLoggerName(s"entity-$id") + + DurableStateBehavior[Command, State]( + persistenceId = PersistenceId.ofUniqueId(id), + emptyState = State(0, Map.empty, Int.MaxValue), + commandHandler = applyCommand(_, _, context)) + .receiveSignal { + case (state, RecoveryCompleted) => + context.log.debug("Recovered state for id [{}] is [{}]", id, state) + recoveryDone ! Done + } + .withTag("count") + } + + private def applyCommand(state: State, cmd: Command, context: ActorContext[Command]): Effect[State] = { + def persistAdd[Reply](n: Int, replyTo: RecipientRef[Reply], reply: Reply): Effect[State] = { + val newState = state.processAdd(n) + + Effect + .persist(newState) + .thenRun { nextState => // should be the same as newState, but... + state.notifyAfter.keysIterator + .filter { at => + (at <= nextState.nextNotifyAt) && !nextState.notifyAfter.isDefinedAt(at) + } + .foreach { at => + state.notifyAfter(at) ! Done + } + + replyTo ! reply + } + .thenUnstashAll() + } + + cmd match { + case Add(n, replyTo) => persistAdd(n, replyTo, Done) + + case AddIfLessThan(toAdd, ifLessThan, replyTo) => + if (state.count >= ifLessThan) { + context.log.info("Rejecting AddIfLessThan as count = {}", state.count) + Effect.none[State].thenRun(_ => replyTo ! false) + } else persistAdd(toAdd, replyTo, true) + + case AddWhenAtLeast(toAdd, whenAtLeast, replyTo) => + if (state.count < whenAtLeast) Effect.stash() + else persistAdd(toAdd, replyTo, Done) + + case NotifyIfAtLeast(n, notifyTo, replyTo) => + if (state.count >= n) { + Effect.none[State].thenRun { _ => + notifyTo ! Done + replyTo ! true + } + } else if (state.notifyAfter.isDefinedAt(n)) { + Effect.none[State].thenRun(_ => replyTo ! false) + } else { + Effect.persist(state.addObserver(n, notifyTo)).thenRun(_ => replyTo ! true) + } + + case GetRevisionNumber(replyTo) => + Effect.none[State].thenRun(_ => replyTo ! DurableStateBehavior.lastSequenceNumber(context)) + } + } + } +} + +class UnpersistentDurableStateSpec extends AnyWordSpec with Matchers { + import UnpersistentDurableStateSpec._ + + import pekko.actor.testkit.typed.scaladsl._ + + import org.slf4j.event.Level + + "Unpersistent DurableStateBehavior" must { + "generate a fail-fast behavior from a non-DurableStateBehavior" in { + val notDurableState = + Behaviors.receive[Any] { (context, msg) => + context.log.info("Got message {}", msg) + Behaviors.same + } + + val unpersistent = UnpersistentBehavior.fromDurableState[Any, Any](notDurableState) + an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit } + assert(!unpersistent.stateProbe.hasEffects, "should be no persistence effects") + } + + "generate a Behavior from a DurableStateBehavior and process RecoveryCompleted" in { + import BehaviorUnderTest._ + + val recoveryDone = TestInbox[Done]() + val behavior = BehaviorUnderTest("test-1", recoveryDone.ref) + + // accessor-style API + val unpersistent = UnpersistentBehavior.fromDurableState[Command, State](behavior) + val probe = unpersistent.stateProbe + val testkit = unpersistent.behaviorTestKit + + assert(!probe.hasEffects, "should not be persistence yet") + recoveryDone.expectMessage(Done) + val logs = testkit.logEntries() + logs.size shouldBe 1 + logs.head.level shouldBe Level.DEBUG + logs.head.message shouldBe s"Recovered state for id [test-1] is [${State(0, Map.empty, Int.MaxValue)}]" + } + + "publish state changes in response to commands" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + val replyTo = TestInbox[Done]() + + // and the more functional-style API + UnpersistentBehavior.fromDurableState[Command, State](behavior) { (testkit, probe) => + testkit.run(Add(1, replyTo.ref)) + replyTo.expectMessage(Done) + probe.expectPersisted(State(1, Map.empty, Int.MaxValue), tag = "count") + assert(!testkit.hasEffects(), "should have no actor effects") + } + } + + "allow a state to be injected" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + val notify3 = TestInbox[Done]() + val initialState = State(1, Map(3 -> notify3.ref), 3) + + UnpersistentBehavior.fromDurableState[Command, State](behavior, Some(initialState)) { (testkit, probe) => + val logs = testkit.logEntries() + + logs.size shouldBe 1 + logs.head.level shouldBe Level.DEBUG + logs.head.message shouldBe s"Recovered state for id [test-1] is [$initialState]" + assert(!probe.hasEffects, "should be no persistence effect") + assert(!notify3.hasMessages, "no messages should be sent to notify3") + + val replyTo = TestInbox[Done]() + testkit.run(AddWhenAtLeast(2, 2, replyTo.ref)) + assert(!replyTo.hasMessages, "no messages should be sent now") + assert(!notify3.hasMessages, "no messages should be sent to notify3") + assert(!probe.hasEffects, "should be no persistence effect") + assert(!testkit.hasEffects(), "should be no testkit effects") + + testkit.run(Add(3, TestInbox[Done]().ref)) + replyTo.expectMessage(Done) + notify3.expectMessage(Done) + assert(!testkit.hasEffects(), "should be no testkit effects") + probe.drain() should contain theSameElementsInOrderAs Seq( + PersistenceEffect(State(4, Map.empty, Int.MaxValue), 1, Set("count")), + PersistenceEffect(State(6, Map.empty, Int.MaxValue), 2, Set("count"))) + } + } + + "stash and unstash properly" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + val replyTo1 = TestInbox[Done]() + val add = Add(1, TestInbox[Done]().ref) + + UnpersistentBehavior.fromDurableState[Command, State](behavior) { (testkit, probe) => + // stashes + testkit.run(AddWhenAtLeast(1, 1, replyTo1.ref)) + assert(!probe.hasEffects, "should be no persistence effect") + assert(!replyTo1.hasMessages, "count is not yet 1") + + // unstashes + testkit.run(add) + replyTo1.expectMessage(Done) + probe.drain() shouldNot be(empty) + + // unstash but nothing in the stash + testkit.run(add) + assert(!replyTo1.hasMessages, "should not send again") + probe.drain() shouldNot be(empty) + } + } + + "retrieve revision number" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + + val replyTo = TestInbox[Long]() + UnpersistentBehavior.fromDurableState[Command, State](behavior) { (testkit, probe) => + testkit.run(GetRevisionNumber(replyTo.ref)) + (the[AssertionError] thrownBy (probe.extract())).getMessage shouldBe "No persistence effects in probe" + replyTo.expectMessage(0) + } + } + } +} diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala new file mode 100644 index 0000000000..d3d3737373 --- /dev/null +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala @@ -0,0 +1,251 @@ +package org.apache.pekko.persistence.testkit.scaladsl + +import org.apache.pekko +import pekko.Done +import pekko.actor.typed.{ Behavior, RecipientRef } +import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors } +import pekko.persistence.typed.PersistenceId +import pekko.persistence.typed.state.RecoveryCompleted +import pekko.persistence.typed.state.scaladsl._ + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +object UnpersistentDurableStateSpec { + object BehaviorUnderTest { + sealed trait Command + + case class Add(n: Int, replyTo: RecipientRef[Done]) extends Command + case class AddIfLessThan(toAdd: Int, ifLessThan: Int, replyTo: RecipientRef[Boolean]) extends Command + case class AddWhenAtLeast(toAdd: Int, whenAtLeast: Int, replyTo: RecipientRef[Done]) extends Command + case class NotifyIfAtLeast(n: Int, notifyTo: RecipientRef[Done], replyTo: RecipientRef[Boolean]) extends Command + case class GetRevisionNumber(replyTo: RecipientRef[Long]) extends Command + + case class State(count: Int, notifyAfter: Map[Int, RecipientRef[Done]], nextNotifyAt: Int) { + def processAdd(n: Int): State = { + val nextCount = count + n + + if (nextCount < nextNotifyAt) copy(count = nextCount) + else { + import scala.collection.mutable + + val (nextNNA, nextNotifyAfter) = { + var lowestNotifyAt = Int.MaxValue + val inProgress = mutable.Map.empty[Int, RecipientRef[Done]] + + notifyAfter.keysIterator.foreach { at => + if (at > nextCount) { + lowestNotifyAt = lowestNotifyAt.min(at) + inProgress += (at -> notifyAfter(at)) + } + } + + lowestNotifyAt -> inProgress.toMap + } + + copy(count = nextCount, notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA) + } + } + + def addObserver(at: Int, notifyTo: RecipientRef[Done]): State = { + val nextNNA = nextNotifyAt.min(at) + val nextNotifyAfter = notifyAfter.updated(at, notifyTo) + + copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA) + } + } + + def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command] = + Behaviors.setup { context => + context.setLoggerName(s"entity-$id") + + DurableStateBehavior[Command, State]( + persistenceId = PersistenceId.ofUniqueId(id), + emptyState = State(0, Map.empty, Int.MaxValue), + commandHandler = applyCommand(_, _, context)) + .receiveSignal { + case (state, RecoveryCompleted) => + context.log.debug("Recovered state for id [{}] is [{}]", id, state) + recoveryDone ! Done + } + .withTag("count") + } + + private def applyCommand(state: State, cmd: Command, context: ActorContext[Command]): Effect[State] = { + def persistAdd[Reply](n: Int, replyTo: RecipientRef[Reply], reply: Reply): Effect[State] = { + val newState = state.processAdd(n) + + Effect + .persist(newState) + .thenRun { nextState => // should be the same as newState, but... + state.notifyAfter.keysIterator + .filter { at => + (at <= nextState.nextNotifyAt) && !nextState.notifyAfter.isDefinedAt(at) + } + .foreach { at => + state.notifyAfter(at) ! Done + } + + replyTo ! reply + } + .thenUnstashAll() + } + + cmd match { + case Add(n, replyTo) => persistAdd(n, replyTo, Done) + + case AddIfLessThan(toAdd, ifLessThan, replyTo) => + if (state.count >= ifLessThan) { + context.log.info("Rejecting AddIfLessThan as count = {}", state.count) + Effect.none[State].thenRun(_ => replyTo ! false) + } else persistAdd(toAdd, replyTo, true) + + case AddWhenAtLeast(toAdd, whenAtLeast, replyTo) => + if (state.count < whenAtLeast) Effect.stash() + else persistAdd(toAdd, replyTo, Done) + + case NotifyIfAtLeast(n, notifyTo, replyTo) => + if (state.count >= n) { + Effect.none[State].thenRun { _ => + notifyTo ! Done + replyTo ! true + } + } else if (state.notifyAfter.isDefinedAt(n)) { + Effect.none[State].thenRun(_ => replyTo ! false) + } else { + Effect.persist(state.addObserver(n, notifyTo)).thenRun(_ => replyTo ! true) + } + + case GetRevisionNumber(replyTo) => + Effect.none[State].thenRun(_ => replyTo ! DurableStateBehavior.lastSequenceNumber(context)) + } + } + } +} + +class UnpersistentDurableStateSpec extends AnyWordSpec with Matchers { + import UnpersistentDurableStateSpec._ + + import pekko.actor.testkit.typed.scaladsl._ + + import org.slf4j.event.Level + + "Unpersistent DurableStateBehavior" must { + "generate a fail-fast behavior from a non-DurableStateBehavior" in { + val notDurableState = + Behaviors.receive[Any] { (context, msg) => + context.log.info("Got message {}", msg) + Behaviors.same + } + + val unpersistent = UnpersistentBehavior.fromDurableState[Any, Any](notDurableState) + an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit } + assert(!unpersistent.stateProbe.hasEffects, "should be no persistence effects") + } + + "generate a Behavior from a DurableStateBehavior and process RecoveryCompleted" in { + import BehaviorUnderTest._ + + val recoveryDone = TestInbox[Done]() + val behavior = BehaviorUnderTest("test-1", recoveryDone.ref) + + // accessor-style API + val unpersistent = UnpersistentBehavior.fromDurableState[Command, State](behavior) + val probe = unpersistent.stateProbe + val testkit = unpersistent.behaviorTestKit + + assert(!probe.hasEffects, "should not be persistence yet") + recoveryDone.expectMessage(Done) + val logs = testkit.logEntries() + logs.size shouldBe 1 + logs.head.level shouldBe Level.DEBUG + logs.head.message shouldBe s"Recovered state for id [test-1] is [${State(0, Map.empty, Int.MaxValue)}]" + } + + "publish state changes in response to commands" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + val replyTo = TestInbox[Done]() + + // and the more functional-style API + UnpersistentBehavior.fromDurableState[Command, State](behavior) { (testkit, probe) => + testkit.run(Add(1, replyTo.ref)) + replyTo.expectMessage(Done) + probe.expectPersisted(State(1, Map.empty, Int.MaxValue), tag = "count") + assert(!testkit.hasEffects(), "should have no actor effects") + } + } + + "allow a state to be injected" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + val notify3 = TestInbox[Done]() + val initialState = State(1, Map(3 -> notify3.ref), 3) + + UnpersistentBehavior.fromDurableState[Command, State](behavior, Some(initialState)) { (testkit, probe) => + val logs = testkit.logEntries() + + logs.size shouldBe 1 + logs.head.level shouldBe Level.DEBUG + logs.head.message shouldBe s"Recovered state for id [test-1] is [$initialState]" + assert(!probe.hasEffects, "should be no persistence effect") + assert(!notify3.hasMessages, "no messages should be sent to notify3") + + val replyTo = TestInbox[Done]() + testkit.run(AddWhenAtLeast(2, 2, replyTo.ref)) + assert(!replyTo.hasMessages, "no messages should be sent now") + assert(!notify3.hasMessages, "no messages should be sent to notify3") + assert(!probe.hasEffects, "should be no persistence effect") + assert(!testkit.hasEffects(), "should be no testkit effects") + + testkit.run(Add(3, TestInbox[Done]().ref)) + replyTo.expectMessage(Done) + notify3.expectMessage(Done) + assert(!testkit.hasEffects(), "should be no testkit effects") + probe.drain() should contain theSameElementsInOrderAs Seq( + PersistenceEffect(State(4, Map.empty, Int.MaxValue), 1, Set("count")), + PersistenceEffect(State(6, Map.empty, Int.MaxValue), 2, Set("count"))) + } + } + + "stash and unstash properly" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + val replyTo1 = TestInbox[Done]() + val add = Add(1, TestInbox[Done]().ref) + + UnpersistentBehavior.fromDurableState[Command, State](behavior) { (testkit, probe) => + // stashes + testkit.run(AddWhenAtLeast(1, 1, replyTo1.ref)) + assert(!probe.hasEffects, "should be no persistence effect") + assert(!replyTo1.hasMessages, "count is not yet 1") + + // unstashes + testkit.run(add) + replyTo1.expectMessage(Done) + probe.drain() shouldNot be(empty) + + // unstash but nothing in the stash + testkit.run(add) + assert(!replyTo1.hasMessages, "should not send again") + probe.drain() shouldNot be(empty) + } + } + + "retrieve revision number" in { + import BehaviorUnderTest._ + + val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref) + + val replyTo = TestInbox[Long]() + UnpersistentBehavior.fromDurableState[Command, State](behavior) { (testkit, probe) => + testkit.run(GetRevisionNumber(replyTo.ref)) + (the[AssertionError] thrownBy (probe.extract())).getMessage shouldBe "No persistence effects in probe" + replyTo.expectMessage(0) + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
