This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch unpersistent
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 0be5d0546db2ee1aa0637f4eca9184099f6f2242
Author: He-Pin <[email protected]>
AuthorDate: Sat Nov 8 16:16:04 2025 +0800

    feat: Add unpersistent versions of persistent behaviors
---
 .../typed/AccountExampleUnpersistentDocTest.java   | 150 +++++++
 .../ExternalShardAllocationCompileOnlyTest.java    |   5 +-
 .../typed/AccountExampleUnpersistentDocSpec.scala  | 102 +++++
 docs/src/main/paradox/typed/persistence-testing.md |  48 ++-
 docs/src/main/paradox/typed/testing-sync.md        |   2 +-
 .../testkit/internal/Unpersistent.scala            | 466 +++++++++++++++++++++
 .../testkit/javadsl/UnpersistentBehavior.scala     | 149 +++++++
 .../testkit/scaladsl/UnpersistentBehavior.scala    | 131 ++++++
 .../scaladsl/UnpersistentDurableStateSpec.scala    | 251 +++++++++++
 .../scaladsl/UnpersistentEventSourcedSpec.scala    | 251 +++++++++++
 10 files changed, 1548 insertions(+), 7 deletions(-)

diff --git 
a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
 
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
new file mode 100644
index 0000000000..66c284f946
--- /dev/null
+++ 
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java
@@ -0,0 +1,150 @@
+package jdocs.org.apache.pekko.cluster.sharding.typed;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.pattern.StatusReply;
+import org.apache.pekko.testkit.package$;
+import org.scalatestplus.junit.JUnitSuite;
+
+import static 
jdocs.org.apache.pekko.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
+import static org.junit.Assert.*;
+
+// #test
+import java.math.BigDecimal;
+import org.apache.pekko.actor.testkit.typed.javadsl.BehaviorTestKit;
+import org.apache.pekko.actor.testkit.typed.javadsl.ReplyInbox;
+import org.apache.pekko.actor.testkit.typed.javadsl.StatusReplyInbox;
+import org.apache.pekko.actor.testkit.typed.javadsl.TestInbox;
+import org.apache.pekko.persistence.testkit.javadsl.UnpersistentBehavior;
+import org.apache.pekko.persistence.testkit.javadsl.PersistenceEffect;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.junit.Test;
+
+public class AccountExampleUnpersistentDocTest
+    // #test
+    extends JUnitSuite
+// #test
+{
+    @Test
+    public void createWithEmptyBalance() {
+        UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+            unpersistent = emptyAccount();
+
+        BehaviorTestKit<AccountEntity.Command> testkit = 
unpersistent.getBehaviorTestKit();
+
+        StatusReplyInbox<Done> ackInbox = 
testkit.runAskWithStatus(AccountEntity.CreateAccount::new);
+
+        ackInbox.expectValue(Done.getInstance());
+        
unpersistent.getEventProbe().expectPersisted(AccountEntity.AccountCreated.INSTANCE);
+
+        // internal state is only exposed by the behavior via responses to 
messages or if it happens
+        //  to snapshot.  This particular behavior never snapshots, so we 
query within the actor's
+        //  protocol
+        assertFalse(unpersistent.getSnapshotProbe().hasEffects());
+
+        ReplyInbox<AccountEntity.CurrentBalance> currentBalanceInbox =
+            testkit.runAsk(AccountEntity.GetBalance::new);
+
+        assertEquals(BigDecimal.ZERO, 
currentBalanceInbox.receiveReply().balance);
+    }
+
+    @Test
+    public void handleDepositAndWithdraw() {
+        UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+            unpersistent = openedAccount();
+
+        BehaviorTestKit<AccountEntity.Command> testkit = 
unpersistent.getBehaviorTestKit();
+        BigDecimal currentBalance;
+
+        testkit
+            .runAskWithStatus(
+                Done.class, replyTo -> new 
AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo))
+            .expectValue(Done.getInstance());
+
+        assertEquals(
+            BigDecimal.valueOf(100),
+            unpersistent
+                .getEventProbe()
+                .expectPersistedClass(AccountEntity.Deposited.class)
+                .persistedObject()
+                .amount);
+
+        currentBalance =
+            testkit
+                .runAsk(AccountEntity.CurrentBalance.class, 
AccountEntity.GetBalance::new)
+                .receiveReply()
+                .balance;
+
+        assertEquals(BigDecimal.valueOf(100), currentBalance);
+
+        testkit
+            .runAskWithStatus(
+                Done.class, replyTo -> new 
AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo))
+            .expectValue(Done.getInstance());
+
+        // can save the persistence effect for in-depth inspection
+        PersistenceEffect<AccountEntity.Withdrawn> withdrawEffect =
+            
unpersistent.getEventProbe().expectPersistedClass(AccountEntity.Withdrawn.class);
+        assertEquals(BigDecimal.valueOf(10), 
withdrawEffect.persistedObject().amount);
+        assertEquals(3L, withdrawEffect.sequenceNr());
+        assertTrue(withdrawEffect.tags().isEmpty());
+
+        currentBalance =
+            testkit
+                .runAsk(AccountEntity.CurrentBalance.class, 
AccountEntity.GetBalance::new)
+                .receiveReply()
+                .balance;
+
+        assertEquals(BigDecimal.valueOf(90), currentBalance);
+    }
+
+    @Test
+    public void rejectWithdrawOverdraft() {
+        UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+            unpersistent = accountWithBalance(BigDecimal.valueOf(100));
+
+        BehaviorTestKit<AccountEntity.Command> testkit = 
unpersistent.getBehaviorTestKit();
+
+        testkit
+            .runAskWithStatus(
+                Done.class, replyTo -> new 
AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo))
+            .expectErrorMessage("not enough funds to withdraw 110");
+
+        assertFalse(unpersistent.getEventProbe().hasEffects());
+    }
+
+    // #test
+    private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+    emptyAccount() {
+        return
+            // #unpersistent-behavior
+            UnpersistentBehavior.fromEventSourced(
+                AccountEntity.create("1", PersistenceId.of("Account", "1")),
+                null, // use the initial state
+                0 // initial sequence number
+            );
+        // #unpersistent-behavior
+    }
+
+    private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+    openedAccount() {
+        return
+            // #unpersistent-behavior-provided-state
+            UnpersistentBehavior.fromEventSourced(
+                AccountEntity.create("1", PersistenceId.of("Account", "1")),
+                new AccountEntity.EmptyAccount()
+                    .openedAccount(), // duplicate the event handler for 
AccountCreated on an EmptyAccount
+                1 // assume that CreateAccount was the first command
+            );
+        // #unpersistent-behavior-provided-state
+    }
+
+    private UnpersistentBehavior<AccountEntity.Command, AccountEntity.Event, 
AccountEntity.Account>
+    accountWithBalance(BigDecimal balance) {
+        return UnpersistentBehavior.fromEventSourced(
+            AccountEntity.create("1", PersistenceId.of("Account", "1")),
+            new AccountEntity.OpenedAccount(balance),
+            2);
+    }
+    // #test
+}
+// #test
diff --git 
a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java
 
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java
index fd6947841b..1d2c105ff8 100644
--- 
a/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java
+++ 
b/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/ExternalShardAllocationCompileOnlyTest.java
@@ -28,6 +28,7 @@ import 
org.apache.pekko.cluster.sharding.typed.ShardingEnvelope;
 import org.apache.pekko.cluster.sharding.typed.javadsl.ClusterSharding;
 import org.apache.pekko.cluster.sharding.typed.javadsl.Entity;
 import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey;
+import org.apache.pekko.util.Timeout;
 
 public class ExternalShardAllocationCompileOnlyTest {
 
@@ -43,8 +44,8 @@ public class ExternalShardAllocationCompileOnlyTest {
         sharding.init(
             Entity.of(typeKey, ctx -> Counter.create(ctx.getEntityId()))
                 .withAllocationStrategy(
-                    ExternalShardAllocationStrategy.create(
-                        system, typeKey.name(), Duration.ofSeconds(5))));
+                    new ExternalShardAllocationStrategy(
+                        system, typeKey.name(), 
Timeout.create(Duration.ofSeconds(5)))));
     // #entity
 
     // #client
diff --git 
a/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
 
b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
new file mode 100644
index 0000000000..546a113615
--- /dev/null
+++ 
b/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala
@@ -0,0 +1,102 @@
+package docs.org.apache.pekko.cluster.sharding.typed
+
+import org.apache.pekko
+import pekko.Done
+import pekko.pattern.StatusReply
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+// #test
+import pekko.actor.testkit.typed.scaladsl.TestInbox
+import pekko.persistence.testkit.scaladsl.UnpersistentBehavior
+import pekko.persistence.typed.PersistenceId
+
+class AccountExampleUnpersistentDocSpec
+    extends AnyWordSpecLike
+    // #test
+    with Matchers
+    // #test
+    {
+  // #test
+  import AccountExampleWithEventHandlersInState.AccountEntity
+  // #test
+  "Account" must {
+    "be created with zero balance" in {
+      val replyToInbox = TestInbox[StatusReply[Done]]()
+      val getBalanceInbox = TestInbox[AccountEntity.CurrentBalance]()
+
+      onAnEmptyAccount { (testkit, eventProbe, snapshotProbe) =>
+        testkit.run(AccountEntity.CreateAccount(replyToInbox.ref))
+        replyToInbox.expectMessage(StatusReply.Ack)
+
+        eventProbe.expectPersisted(AccountEntity.AccountCreated)
+
+        // internal state is only exposed by the behavior via responses to 
messages or if it happens
+        //  to snapshot.  This particular behavior never snapshots, so we 
query within the actor's
+        //  protocol
+        snapshotProbe.hasEffects shouldBe false
+
+        testkit.run(AccountEntity.GetBalance(getBalanceInbox.ref))
+
+        getBalanceInbox.receiveMessage().balance shouldBe 0
+      }
+    }
+
+    "handle Deposit and Withdraw" in {
+      val replyToInbox = TestInbox[StatusReply[Done]]()
+      val getBalanceInbox = TestInbox[AccountEntity.CurrentBalance]()
+
+      onAnOpenedAccount { (testkit, eventProbe, _) =>
+        testkit.run(AccountEntity.Deposit(100, replyToInbox.ref))
+
+        replyToInbox.expectMessage(StatusReply.Ack)
+        eventProbe.expectPersisted(AccountEntity.Deposited(100))
+
+        testkit.run(AccountEntity.Withdraw(10, replyToInbox.ref))
+
+        replyToInbox.expectMessage(StatusReply.Ack)
+        eventProbe.expectPersisted(AccountEntity.Withdrawn(10))
+
+        testkit.run(AccountEntity.GetBalance(getBalanceInbox.ref))
+
+        getBalanceInbox.receiveMessage().balance shouldBe 90
+      }
+    }
+
+    "reject Withdraw overdraft" in {
+      val replyToInbox = TestInbox[StatusReply[Done]]()
+
+      onAnAccountWithBalance(100) { (testkit, eventProbe, _) =>
+        testkit.run(AccountEntity.Withdraw(110, replyToInbox.ref))
+
+        replyToInbox.receiveMessage().isError shouldBe true
+        eventProbe.hasEffects shouldBe false
+      }
+    }
+  }
+  // #test
+
+  // #unpersistent-behavior
+  private def onAnEmptyAccount
+      : UnpersistentBehavior.EventSourced[AccountEntity.Command, 
AccountEntity.Event, AccountEntity.Account] =
+    UnpersistentBehavior.fromEventSourced(AccountEntity("1", 
PersistenceId("Account", "1")))
+  // #unpersistent-behavior
+
+  // #unpersistent-behavior-provided-state
+  private def onAnOpenedAccount
+      : UnpersistentBehavior.EventSourced[AccountEntity.Command, 
AccountEntity.Event, AccountEntity.Account] =
+    UnpersistentBehavior.fromEventSourced(
+      AccountEntity("1", PersistenceId("Account", "1")),
+      Some(
+        AccountEntity.EmptyAccount.applyEvent(AccountEntity.AccountCreated) -> 
// reuse the event handler
+        1L // assume that CreateAccount was the first command
+      ))
+  // #unpersistent-behavior-provided-state
+
+  private def onAnAccountWithBalance(balance: BigDecimal) =
+    UnpersistentBehavior.fromEventSourced(
+      AccountEntity("1", PersistenceId("Account", "1")),
+      Some(AccountEntity.OpenedAccount(balance) -> 2L))
+  // #test
+}
+// #test
diff --git a/docs/src/main/paradox/typed/persistence-testing.md 
b/docs/src/main/paradox/typed/persistence-testing.md
index 46d828c153..f73c9b67bc 100644
--- a/docs/src/main/paradox/typed/persistence-testing.md
+++ b/docs/src/main/paradox/typed/persistence-testing.md
@@ -19,9 +19,49 @@ To use Pekko Persistence TestKit, add the module to your 
project:
 
 @@project-info{ projectId="persistence-testkit" }
 
-## Unit testing
+## Unit testing with the BehaviorTestKit
 
-**Note!** The `EventSourcedBehaviorTestKit` is a new feature, api may have 
changes breaking source compatibility in future versions.
+**Note!** The `UnpersistentBehavior` is a new feature: the API may have 
changes breaking source compatibility in future versions.
+
+Unit testing of `EventSourcedBehavior` can be performed by converting it into 
an @apidoc[UnpersistentBehavior]. Instead of
+persisting events and snapshots, the `UnpersistentBehavior` exposes 
@apidoc[PersistenceProbe]s for events and snapshots which
+can be asserted on.
+
+Scala
+: @@snip 
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
 { #unpersistent-behavior }
+
+Java
+: @@snip 
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
 { #unpersistent-behavior }
+
+The `UnpersistentBehavior` can be initialized with arbitrary states:
+
+Scala
+: @@snip 
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
 { #unpersistent-behavior-provided-state }
+
+Java
+: @@snip 
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
 { #unpersistent-behavior-provided-state }
+
+The `UnpersistentBehavior` is especially well-suited to the synchronous 
@ref:[`BehaviorTestKit`](testing-sync.md#synchronous-behavior-testing):
+the `UnpersistentBehavior` can directly construct a `BehaviorTestKit` wrapping 
the behavior.  When commands are run by `BehaviorTestKit`,
+they are processed in the calling thread (viz. the test suite), so when the 
run returns, the suite can be sure that the message has been
+fully processed.  The internal state of the `EventSourcedBehavior` is not 
exposed to the suite except to the extent that it affects how
+the behavior responds to commands or the events it persists (in addition, any 
snapshots made by the behavior are available through a
+`PersistenceProbe`).
+
+A full test for the `AccountEntity`, which is shown in the @ref:[Persistence 
Style Guide](persistence-style.md) might look like:
+
+Scala
+: @@snip 
[AccountExampleUnpersistentDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocSpec.scala)
 { #test }
+
+Java
+: @@snip 
[AccountExampleUnpersistentDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleUnpersistentDocTest.java)
 { #test }
+
+`UnpersistentBehavior` does not require any configuration.  It therefore does 
not verify the serialization of commands, events, or state.
+If using this style, it is advised to independently test serialization for 
those classes.
+
+## Unit testing with the the ActorTestKit and EventSourcedBehaviorTestKit
+
+**Note!** The `EventSourcedBehaviorTestKit` is a new feature: the API may have 
changes breaking source compatibility in future versions.
 
 Unit testing of `EventSourcedBehavior` can be done with the 
@apidoc[EventSourcedBehaviorTestKit]. It supports running
 one command at a time and you can assert that the synchronously returned 
result is as expected. The result contains the
@@ -35,7 +75,7 @@ Scala
 :  @@snip 
[AccountExampleDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocSpec.scala)
 { #testkit }
 
 Java
-:  @@snip 
[AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java)
 { #testkit } 
+:  @@snip 
[AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java)
 { #testkit } 
 
 A full test for the `AccountEntity`, which is shown in the @ref:[Persistence 
Style Guide](persistence-style.md), may look like this:
 
@@ -43,7 +83,7 @@ Scala
 :  @@snip 
[AccountExampleDocSpec.scala](/cluster-sharding-typed/src/test/scala/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocSpec.scala)
 { #test }
 
 Java
-:  @@snip 
[AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/jdocs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java)
 { #test }  
+:  @@snip 
[AccountExampleDocTest.java](/cluster-sharding-typed/src/test/java/docs/org/apache/pekko/cluster/sharding/typed/AccountExampleDocTest.java)
 { #test }  
 
 Serialization of commands, events and state are verified automatically. The 
serialization checks can be
 customized with the `SerializationSettings` when creating the 
`EventSourcedBehaviorTestKit`. By default,
diff --git a/docs/src/main/paradox/typed/testing-sync.md 
b/docs/src/main/paradox/typed/testing-sync.md
index f496bb76d9..dd8eae0c0c 100644
--- a/docs/src/main/paradox/typed/testing-sync.md
+++ b/docs/src/main/paradox/typed/testing-sync.md
@@ -12,7 +12,7 @@ limitations:
 * Spawning of @scala[`Future`]@java[`CompletionStage`] or other asynchronous 
task and you rely on a callback to
   complete before observing the effect you want to test.
 * Usage of scheduler is not supported.
-* `EventSourcedBehavior` can't be tested.
+* `EventSourcedBehavior` can't be fully tested, but it is possible to 
@ref:[test the core 
functionality](persistence-testing.md#unit-testing-with-the-behaviortestkit)
 * Interactions with other actors must be stubbed.
 * Blackbox testing style.
 * Supervision is not supported.
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
new file mode 100644
index 0000000000..de9077844a
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/internal/Unpersistent.scala
@@ -0,0 +1,466 @@
+package org.apache.pekko.persistence.testkit.internal
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.annotation.tailrec
+import scala.collection.immutable
+import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.internal.BehaviorImpl.DeferredBehavior
+import pekko.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
+import pekko.annotation.InternalApi
+import pekko.persistence.testkit.{ javadsl, scaladsl }
+import pekko.persistence.typed.internal.EventSourcedBehaviorImpl
+import pekko.persistence.typed.internal.Running.WithSeqNrAccessible
+import pekko.persistence.typed.state.internal.DurableStateBehaviorImpl
+import pekko.persistence.typed.state.internal.Running.WithRevisionAccessible
+import pekko.util.ConstantFun.{ scalaAnyToUnit => doNothing }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object Unpersistent {
+
+  def eventSourced[Command, Event, State](behavior: Behavior[Command], 
fromStateAndSequenceNr: Option[(State, Long)])(
+      onEvent: (Event, Long, Set[String]) => Unit)(onSnapshot: (State, Long) 
=> Unit): Behavior[Command] = {
+    @tailrec
+    def findEventSourcedBehavior(
+        b: Behavior[Command],
+        context: ActorContext[Command]): 
Option[EventSourcedBehaviorImpl[Command, Event, State]] = {
+      b match {
+        case es: EventSourcedBehaviorImpl[Command, _, _] =>
+          Some(es.asInstanceOf[EventSourcedBehaviorImpl[Command, Event, 
State]])
+
+        case deferred: DeferredBehavior[Command] =>
+          findEventSourcedBehavior(deferred(context), context)
+
+        case _ => None
+      }
+    }
+
+    Behaviors.setup[Command] { context =>
+      findEventSourcedBehavior(behavior, context).fold {
+        throw new AssertionError("Did not find the expected 
EventSourcedBehavior")
+      } { esBehavior =>
+        val (initialState, initialSequenceNr) = 
fromStateAndSequenceNr.getOrElse(esBehavior.emptyState -> 0L)
+        new WrappedEventSourcedBehavior(context, esBehavior, initialState, 
initialSequenceNr, onEvent, onSnapshot)
+      }
+    }
+  }
+
+  def durableState[Command, State](behavior: Behavior[Command], fromState: 
Option[State])(
+      onPersist: (State, Long, String) => Unit): Behavior[Command] = {
+
+    @tailrec
+    def findDurableStateBehavior(
+        b: Behavior[Command],
+        context: ActorContext[Command]): 
Option[DurableStateBehaviorImpl[Command, State]] =
+      b match {
+        case ds: DurableStateBehaviorImpl[Command, _] =>
+          Some(ds.asInstanceOf[DurableStateBehaviorImpl[Command, State]])
+
+        case deferred: DeferredBehavior[Command] => 
findDurableStateBehavior(deferred(context), context)
+        case _                                   => None
+      }
+
+    Behaviors.setup[Command] { context =>
+      findDurableStateBehavior(behavior, context).fold {
+        throw new AssertionError("Did not find the expected 
DurableStateBehavior")
+      } { dsBehavior =>
+        val initialState = fromState.getOrElse(dsBehavior.emptyState)
+        new WrappedDurableStateBehavior(context, dsBehavior, initialState, 
onPersist)
+      }
+    }
+  }
+
+  private class WrappedEventSourcedBehavior[Command, Event, State](
+      context: ActorContext[Command],
+      esBehavior: EventSourcedBehaviorImpl[Command, Event, State],
+      initialState: State,
+      initialSequenceNr: Long,
+      onEvent: (Event, Long, Set[String]) => Unit,
+      onSnapshot: (State, Long) => Unit)
+      extends AbstractBehavior[Command](context)
+      with WithSeqNrAccessible {
+    import pekko.persistence.typed.{ EventSourcedSignal, RecoveryCompleted, 
SnapshotCompleted, SnapshotMetadata }
+    import pekko.persistence.typed.internal._
+
+    override def currentSequenceNumber: Long = sequenceNr
+
+    private def commandHandler = esBehavior.commandHandler
+    private def eventHandler = esBehavior.eventHandler
+    private def tagger = esBehavior.tagger
+    private def snapshotWhen = esBehavior.snapshotWhen
+    private def retention = esBehavior.retention
+    private def signalHandler = esBehavior.signalHandler
+
+    private var sequenceNr: Long = initialSequenceNr
+    private var state: State = initialState
+    private val stashedCommands = ListBuffer.empty[Command]
+
+    private def snapshotMetadata() =
+      SnapshotMetadata(esBehavior.persistenceId.toString, sequenceNr, 
System.currentTimeMillis())
+    private def sendSignal(signal: EventSourcedSignal): Unit =
+      signalHandler.applyOrElse(state -> signal, doNothing)
+
+    sendSignal(RecoveryCompleted)
+
+    override def onMessage(cmd: Command): Behavior[Command] = {
+      var shouldSnapshot = false
+      var shouldUnstash = false
+      var shouldStop = false
+
+      def snapshotRequested(evt: Event): Boolean = {
+        val snapshotFromRetention = retention match {
+          case DisabledRetentionCriteria             => false
+          case s: SnapshotCountRetentionCriteriaImpl => 
s.snapshotWhen(sequenceNr)
+          case unexpected                            => throw new 
IllegalStateException(s"Unexpected retention criteria: $unexpected")
+        }
+
+        snapshotFromRetention || snapshotWhen(state, evt, sequenceNr)
+      }
+
+      def persistEvent(evt: Event): Unit = {
+        sequenceNr += 1
+        onEvent(evt, sequenceNr, tagger(evt))
+        state = eventHandler(state, evt)
+        shouldSnapshot = shouldSnapshot || snapshotRequested(evt)
+      }
+
+      @tailrec
+      def applyEffects(curEffect: EffectImpl[Event, State], sideEffects: 
immutable.Seq[SideEffect[State]]): Unit =
+        curEffect match {
+          case CompositeEffect(eff: EffectImpl[Event, State], se) =>
+            applyEffects(eff, se ++ sideEffects)
+
+          case Persist(event) =>
+            persistEvent(event)
+            sideEffect(sideEffects)
+
+          case PersistAll(events) =>
+            events.foreach(persistEvent)
+            sideEffect(sideEffects)
+
+          // From outside of the behavior, these are equivalent: no state 
update
+          case _: PersistNothing.type | _: Unhandled.type =>
+            sideEffect(sideEffects)
+
+          case _: Stash.type =>
+            stashedCommands.append(cmd)
+            sideEffect(sideEffects)
+
+          case _ =>
+            context.log.error("Unexpected effect {}, stopping", curEffect)
+            Behaviors.stopped
+        }
+
+      def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit =
+        sideEffects.iterator.foreach { effect =>
+          effect match {
+            case _: Stop.type       => shouldStop = true
+            case _: UnstashAll.type => shouldUnstash = true
+            case cb: Callback[_]    => cb.sideEffect(state)
+          }
+        }
+
+      applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[Event, 
State]], Nil)
+
+      if (shouldSnapshot) {
+        onSnapshot(state, sequenceNr)
+        sendSignal(SnapshotCompleted(snapshotMetadata()))
+      }
+
+      if (shouldStop) Behaviors.stopped
+      else if (shouldUnstash && stashedCommands.nonEmpty) {
+        val numStashed = stashedCommands.length
+        val thisWrappedBehavior = this
+
+        Behaviors.setup { _ =>
+          Behaviors.withStash(numStashed) { stash =>
+            stashedCommands.foreach { sc =>
+              stash.stash(sc)
+              ()
+            }
+
+            stashedCommands.remove(0, numStashed)
+            stash.unstashAll(thisWrappedBehavior)
+          }
+        }
+      } else this
+    }
+  }
+
+  private class WrappedDurableStateBehavior[Command, State](
+      context: ActorContext[Command],
+      dsBehavior: DurableStateBehaviorImpl[Command, State],
+      initialState: State,
+      onPersist: (State, Long, String) => Unit)
+      extends AbstractBehavior[Command](context)
+      with WithRevisionAccessible {
+
+    import pekko.persistence.typed.state.{ DurableStateSignal, 
RecoveryCompleted }
+    import pekko.persistence.typed.state.internal._
+
+    override def currentRevision: Long = sequenceNr
+
+    private def commandHandler = dsBehavior.commandHandler
+    private def signalHandler = dsBehavior.signalHandler
+    private val tag = dsBehavior.tag
+
+    private var sequenceNr: Long = 0
+    private var state: State = initialState
+    private val stashedCommands = ListBuffer.empty[Command]
+
+    private def sendSignal(signal: DurableStateSignal): Unit =
+      signalHandler.applyOrElse(state -> signal, doNothing)
+
+    sendSignal(RecoveryCompleted)
+
+    override def onMessage(cmd: Command): Behavior[Command] = {
+      var shouldUnstash = false
+      var shouldStop = false
+
+      def persistState(st: State): Unit = {
+        sequenceNr += 1
+        onPersist(st, sequenceNr, tag)
+        state = st
+      }
+
+      @tailrec
+      def applyEffects(curEffect: EffectImpl[State], sideEffects: 
immutable.Seq[SideEffect[State]]): Unit =
+        curEffect match {
+          case CompositeEffect(eff: EffectImpl[_], se) =>
+            applyEffects(eff.asInstanceOf[EffectImpl[State]], se ++ 
sideEffects)
+
+          case Persist(st) =>
+            persistState(st)
+            sideEffect(sideEffects)
+
+          case _: PersistNothing.type | _: Unhandled.type =>
+            sideEffect(sideEffects)
+
+          case _: Stash.type =>
+            stashedCommands.append(cmd)
+            sideEffect(sideEffects)
+
+          case _ =>
+            context.log.error("Unexpected effect, stopping")
+            Behaviors.stopped
+        }
+
+      def sideEffect(sideEffects: immutable.Seq[SideEffect[State]]): Unit =
+        sideEffects.iterator.foreach { effect =>
+          effect match {
+            case _: Stop.type       => shouldStop = true
+            case _: UnstashAll.type => shouldUnstash = true
+            case cb: Callback[_]    => cb.sideEffect(state)
+          }
+        }
+
+      applyEffects(commandHandler(state, cmd).asInstanceOf[EffectImpl[State]], 
Nil)
+
+      if (shouldStop) Behaviors.stopped
+      else if (shouldUnstash && stashedCommands.nonEmpty) {
+        val numStashed = stashedCommands.length
+        val thisWrappedBehavior = this
+
+        Behaviors.setup { _ =>
+          Behaviors.withStash(numStashed) { stash =>
+            stashedCommands.foreach { sc =>
+              stash.stash(sc)
+              () // explicit discard
+            }
+            stashedCommands.remove(0, numStashed)
+            stash.unstashAll(thisWrappedBehavior)
+          }
+        }
+      } else this
+    }
+  }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[akka] class PersistenceProbeImpl[T] {
+  type Element = (T, Long, Set[String])
+
+  val queue = new ConcurrentLinkedQueue[Element]()
+
+  def persist(elem: Element): Unit = { queue.offer(elem); () }
+
+  def rawExtract(): Element =
+    queue.poll() match {
+      case null => throw new AssertionError("No persistence effects in probe")
+      case elem => elem
+    }
+
+  def asScala: scaladsl.PersistenceProbe[T] =
+    new scaladsl.PersistenceProbe[T] {
+      import scaladsl.{ PersistenceEffect, PersistenceProbe }
+
+      def drain(): Seq[PersistenceEffect[T]] = {
+        @annotation.tailrec
+        def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]] 
= {
+          val elem = queue.poll()
+          if (elem == null) acc else iter(persistenceEffect(elem) :: acc)
+        }
+
+        iter(Nil).reverse
+      }
+
+      def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract())
+
+      def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S] =
+        rawExtract() match {
+          case (obj: S, sequenceNr, tags) => PersistenceEffect(obj, 
sequenceNr, tags)
+          case (extracted, _, _)          =>
+            throw new AssertionError(
+              s"Expected object of type 
[${implicitly[ClassTag[S]].runtimeClass.getName}] to be persisted, " +
+              s"but actual was of type [${extracted.getClass.getName}]")
+        }
+
+      def hasEffects: Boolean = !queue.isEmpty
+
+      def expectPersisted(obj: T): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, _) if obj == persistedObj => this
+          case (persistedObj, _, _)                        =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+        }
+
+      def expectPersisted(obj: T, tag: String): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag)) 
=> this
+
+          case (persistedObj, _, tags) if obj == persistedObj =>
+            throw new AssertionError(
+              s"Expected persistence with tag [$tag], but actual tags were 
[${tags.mkString(",")}]")
+
+          case (persistedObj, _, tags) if tags(tag) =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+
+          case (persistedObj, _, tags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tag [$tag], " +
+              s"but actual object was [$persistedObj] with tags 
[${tags.mkString(",")}]")
+        }
+
+      def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, persistedTags) if (obj == persistedObj) && 
(tags == persistedTags) => this
+          case (persistedObj, _, persistedTags) if obj == persistedObj         
                     =>
+            val unexpected = persistedTags.diff(tags)
+            val notPersistedWith = tags.diff(persistedTags)
+
+            throw new AssertionError(
+              s"Expected persistence with [${tags.mkString(",")}], " +
+              s"but saw unexpected actual tags [${unexpected.mkString(",")}] 
and " +
+              s"did not see actual tags [${notPersistedWith.mkString(",")}]")
+
+          case (persistedObj, _, persistedTags) if tags == persistedTags =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj}]")
+
+          case (persistedObj, _, persistedTags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tags 
[${tags.mkString(",")}], " +
+              s"but actual object was [$persistedObj] with tags 
[${persistedTags.mkString(",")}]")
+        }
+
+      private def persistenceEffect(elem: Element): PersistenceEffect[T] =
+        PersistenceEffect(elem._1, elem._2, elem._3)
+    }
+
+  def asJava: javadsl.PersistenceProbe[T] =
+    new javadsl.PersistenceProbe[T] {
+      import java.util.{ List => JList, Set => JSet }
+
+      import javadsl.{ PersistenceEffect, PersistenceProbe }
+
+      def drain(): JList[PersistenceEffect[T]] = {
+        @annotation.tailrec
+        def iter(acc: List[PersistenceEffect[T]]): List[PersistenceEffect[T]] 
= {
+          val elem = queue.poll()
+          if (elem == null) acc else iter(persistenceEffect(elem) :: acc)
+        }
+        iter(Nil).reverse.asJava
+      }
+
+      def extract(): PersistenceEffect[T] = persistenceEffect(rawExtract())
+
+      def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S] =
+        rawExtract() match {
+          case (obj, sequenceNr, tags) if clazz.isInstance(obj) =>
+            PersistenceEffect(clazz.cast(obj), sequenceNr, tags.asJava)
+
+          case (extracted, _, _) =>
+            throw new AssertionError(
+              s"Expected object of type [${clazz.getName}] to be persisted, " +
+              s"but actual was of type [${extracted.getClass.getName}]")
+        }
+
+      def hasEffects: Boolean = !queue.isEmpty
+
+      def expectPersisted(obj: T): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, _) if obj == persistedObj => this
+          case (persistedObj, _, _)                        =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+        }
+
+      def expectPersisted(obj: T, tag: String): PersistenceProbe[T] =
+        rawExtract() match {
+          case (persistedObj, _, tags) if (obj == persistedObj) && (tags(tag)) 
=> this
+
+          case (persistedObj, _, tags) if obj == persistedObj =>
+            throw new AssertionError(
+              s"Expected persistence with tag [$tag], but actual tags were 
[${tags.mkString(",")}]")
+
+          case (persistedObj, _, tags) if tags(tag) =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj]")
+
+          case (persistedObj, _, tags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tag [$tag], " +
+              s"but actual object was [$persistedObj] with tags 
[${tags.mkString(",")}]")
+        }
+
+      def expectPersisted(obj: T, tags: JSet[String]): PersistenceProbe[T] = {
+        val sTags = tags.asScala
+
+        // Not sure if a Java Set after asScala-ing will compare equal to a 
Scala Set...
+        def sameTags(persistedTags: Set[String]): Boolean =
+          sTags.forall(persistedTags) && persistedTags.forall(sTags)
+
+        rawExtract() match {
+          case (persistedObj, _, persistedTags) if (obj == persistedObj) && 
sameTags(persistedTags) => this
+
+          case (persistedObj, _, persistedTags) if obj == persistedObj =>
+            val unexpected = persistedTags.diff(sTags)
+            val notPersistedWith = sTags.diff(persistedTags)
+
+            throw new AssertionError(
+              s"Expected persistence with [${sTags.mkString(",")}], " +
+              s"but saw unexpected actual tags [${unexpected.mkString(",")}] 
and " +
+              s"did not see actual tags [${notPersistedWith.mkString(",")}]")
+
+          case (persistedObj, _, persistedTags) if sameTags(persistedTags) =>
+            throw new AssertionError(s"Expected object [$obj] to be persisted, 
but actual was [$persistedObj}]")
+
+          case (persistedObj, _, persistedTags) =>
+            throw new AssertionError(
+              s"Expected object [$obj] to be persisted with tags 
[${sTags.mkString(",")}], " +
+              s"but actual object was [$persistedObj] with tags 
[${persistedTags.mkString(",")}]")
+        }
+      }
+
+      private def persistenceEffect(element: Element): PersistenceEffect[T] =
+        PersistenceEffect(element._1, element._2, element._3.asJava)
+    }
+}
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
new file mode 100644
index 0000000000..30a87ffd41
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/javadsl/UnpersistentBehavior.scala
@@ -0,0 +1,149 @@
+package org.apache.pekko.persistence.testkit.javadsl
+
+import java.util.{ List, Set }
+
+import scala.collection.immutable.{ Set => ScalaSet }
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.javadsl.BehaviorTestKit
+import pekko.actor.typed.Behavior
+import pekko.annotation.DoNotInherit
+import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent 
}
+
+/**
+ * Factory methods to create UnpersistentBehavior instances for testing.
+ *
+ * @since 1.3.0
+ */
+object UnpersistentBehavior {
+
+  /**
+   * Given an EventSourcedBehavior, produce a non-persistent Behavior which 
synchronously publishes events and snapshots
+   *  for inspection.  State is updated as in the EventSourcedBehavior, and 
side effects are performed synchronously.  The
+   *  resulting Behavior is, contingent on the command handling, event 
handling, and side effects being compatible with the
+   *  BehaviorTestKit, testable with the BehaviorTestKit.
+   *
+   *  The returned Behavior does not intrinsically depend on configuration: it 
therefore does not serialize and
+   *  assumes an unbounded stash for commands.
+   *
+   *  @param behavior a (possibly wrapped) EventSourcedBehavior to serve as 
the basis for the unpersistent behavior
+   *  @param initialState start the unpersistent behavior with this state; if 
null, behavior's initialState will be used
+   *  @param initialSequenceNr start the unpersistent behavior with this 
sequence number; only applies if initialState is non-null
+   *  @return an UnpersistentBehavior based on an EventSourcedBehavior
+   */
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command],
+      initialState: State,
+      initialSequenceNr: Long): UnpersistentBehavior[Command, Event, State] = {
+    require(initialSequenceNr >= 0, "initialSequenceNr must be at least zero")
+
+    val initialStateAndSequenceNr = Option(initialState).map(_ -> 
initialSequenceNr)
+    val eventProbe = new PersistenceProbeImpl[Event]
+    val snapshotProbe = new PersistenceProbeImpl[State]
+
+    val b =
+      Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) {
+        (event: Event, sequenceNr: Long, tags: ScalaSet[String]) =>
+          eventProbe.persist((event, sequenceNr, tags))
+      } { (snapshot, sequenceNr) =>
+        snapshotProbe.persist((snapshot, sequenceNr, ScalaSet.empty))
+      }
+
+    new UnpersistentBehavior(b, eventProbe.asJava, snapshotProbe.asJava)
+  }
+
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command]): UnpersistentBehavior[Command, Event, 
State] =
+    fromEventSourced(behavior, null.asInstanceOf[State], 0)
+
+  def fromDurableState[Command, State](
+      behavior: Behavior[Command],
+      initialState: State): UnpersistentBehavior[Command, Void, State] = {
+    val probe = new PersistenceProbeImpl[State]
+    val b =
+      Unpersistent.durableState(behavior, Option(initialState)) { (state, 
version, tag) =>
+        probe.persist((state, version, if (tag == "") ScalaSet.empty else 
ScalaSet(tag)))
+      }
+
+    new UnpersistentBehavior(b, noEventProbe, probe.asJava)
+  }
+
+  def fromDurableState[Command, State](behavior: Behavior[Command]): 
UnpersistentBehavior[Command, Void, State] =
+    fromDurableState(behavior, null.asInstanceOf[State])
+
+  private val noEventProbe: PersistenceProbe[Void] =
+    new PersistenceProbe[Void] {
+      def drain(): List[PersistenceEffect[Void]] =
+        // could return an empty list, but the intent is that any use of this 
probe should fail the test
+        boom()
+
+      def extract(): PersistenceEffect[Void] = boom()
+      def expectPersistedClass[S <: Void](clazz: Class[S]): 
PersistenceEffect[S] = boom()
+      def hasEffects: Boolean = boom()
+      def expectPersisted(obj: Void): PersistenceProbe[Void] = boom()
+      def expectPersisted(obj: Void, tag: String): PersistenceProbe[Void] = 
boom()
+      def expectPersisted(obj: Void, tags: Set[String]): 
PersistenceProbe[Void] = boom()
+
+      private def boom() = throw new AssertionError("No events were persisted")
+    }
+}
+
+final class UnpersistentBehavior[Command, Event, State] private (
+    behavior: Behavior[Command],
+    eventProbe: PersistenceProbe[Event],
+    stateProbe: PersistenceProbe[State]) {
+  def getBehavior(): Behavior[Command] = behavior
+  def getBehaviorTestKit(): BehaviorTestKit[Command] = btk
+
+  /** Note: durable state behaviors will not publish events to this probe */
+  def getEventProbe(): PersistenceProbe[Event] = eventProbe
+
+  def getStateProbe(): PersistenceProbe[State] = stateProbe
+  def getSnapshotProbe(): PersistenceProbe[State] = stateProbe
+
+  private lazy val btk = BehaviorTestKit.create(behavior)
+}
+
+final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long, 
tags: Set[String])
+
+/**
+ * Not for user extension
+ */
+@DoNotInherit
+trait PersistenceProbe[T] {
+
+  /** Collect all persistence effects from the probe and empty the probe */
+  def drain(): List[PersistenceEffect[T]]
+
+  /** Get and remove the oldest persistence effect from the probe */
+  def extract(): PersistenceEffect[T]
+
+  /**
+   * Get and remove the oldest persistence effect from the probe, failing if 
the
+   *  persisted object is not of the requested type
+   */
+  def expectPersistedClass[S <: T](clazz: Class[S]): PersistenceEffect[S]
+
+  /** Are there any persistence effects */
+  def hasEffects: Boolean
+
+  /**
+   * Assert that the given object was persisted in the oldest persistence 
effect and
+   *  remove that persistence effect
+   */
+  def expectPersisted(obj: T): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tag in the 
oldest persistence
+   *  effect and remove that persistence effect.  If the persistence effect 
has multiple tags,
+   *  only one of them has to match in order for the assertion to succeed.
+   */
+  def expectPersisted(obj: T, tag: String): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tag in the 
oldest persistence
+   *  effect and remove that persistence effect.  If the persistence effect 
has tags which are
+   *  not given, the assertion fails.
+   */
+  def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T]
+}
diff --git 
a/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
new file mode 100644
index 0000000000..ee7d2c5876
--- /dev/null
+++ 
b/persistence-testkit/src/main/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentBehavior.scala
@@ -0,0 +1,131 @@
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.scaladsl.BehaviorTestKit
+import pekko.actor.typed.Behavior
+import pekko.annotation.DoNotInherit
+import pekko.persistence.testkit.internal.{ PersistenceProbeImpl, Unpersistent 
}
+
+sealed trait UnpersistentBehavior[Command, State] {
+  val behavior: Behavior[Command]
+  lazy val behaviorTestKit = BehaviorTestKit(behavior)
+
+  def stateProbe: PersistenceProbe[State]
+}
+
+/**
+ * Factory methods to create UnpersistentBehavior instances for testing.
+ *
+ * @since 1.3.0
+ */
+object UnpersistentBehavior {
+
+  /**
+   * Given an EventSourcedBehavior, produce a non-persistent Behavior which 
synchronously publishes events and snapshots
+   *  for inspection.  State is updated as in the EventSourcedBehavior, and 
side effects are performed synchronously.  The
+   *  resulting Behavior is, contingent on the command handling, event 
handling, and side effects being compatible with the
+   *  BehaviorTestKit, testable with the BehaviorTestKit.
+   *
+   *  The returned Behavior does not intrinsically depend on configuration: it 
therefore does not serialize and assumes an
+   *  unbounded stash for commands.
+   */
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command],
+      initialStateAndSequenceNr: Option[(State, Long)] = None): 
EventSourced[Command, Event, State] = {
+    val eventProbe = new PersistenceProbeImpl[Event]
+    val snapshotProbe = new PersistenceProbeImpl[State]
+    val resultingBehavior =
+      Unpersistent.eventSourced(behavior, initialStateAndSequenceNr) {
+        (event: Event, sequenceNr: Long, tags: Set[String]) =>
+          eventProbe.persist((event, sequenceNr, tags))
+      } { (snapshot, sequenceNr) =>
+        snapshotProbe.persist((snapshot, sequenceNr, Set.empty))
+      }
+
+    EventSourced(resultingBehavior, eventProbe.asScala, snapshotProbe.asScala)
+  }
+
+  def fromEventSourced[Command, Event, State](
+      behavior: Behavior[Command],
+      initialState: State): EventSourced[Command, Event, State] =
+    fromEventSourced(behavior, Some(initialState -> 0L))
+
+  def fromDurableState[Command, State](
+      behavior: Behavior[Command],
+      initialState: Option[State] = None): DurableState[Command, State] = {
+    val probe = new PersistenceProbeImpl[State]
+
+    val resultingBehavior =
+      Unpersistent.durableState(behavior, initialState) { (state, version, 
tag) =>
+        probe.persist((state, version, if (tag.isEmpty) Set.empty else 
Set(tag)))
+      }
+
+    DurableState(resultingBehavior, probe.asScala)
+  }
+
+  final case class EventSourced[Command, Event, State](
+      override val behavior: Behavior[Command],
+      val eventProbe: PersistenceProbe[Event],
+      override val stateProbe: PersistenceProbe[State])
+      extends UnpersistentBehavior[Command, State] {
+    def apply(f: (BehaviorTestKit[Command], PersistenceProbe[Event], 
PersistenceProbe[State]) => Unit): Unit =
+      f(behaviorTestKit, eventProbe, stateProbe)
+
+    def snapshotProbe: PersistenceProbe[State] = stateProbe
+  }
+
+  final case class DurableState[Command, State](
+      override val behavior: Behavior[Command],
+      override val stateProbe: PersistenceProbe[State])
+      extends UnpersistentBehavior[Command, State] {
+    def apply(f: (BehaviorTestKit[Command], PersistenceProbe[State]) => Unit): 
Unit =
+      f(behaviorTestKit, stateProbe)
+  }
+}
+
+final case class PersistenceEffect[T](persistedObject: T, sequenceNr: Long, 
tags: Set[String])
+
+/**
+ * Not for user extension
+ */
+@DoNotInherit
+trait PersistenceProbe[T] {
+
+  /** Collect all persistence effects from the probe and empty the probe */
+  def drain(): Seq[PersistenceEffect[T]]
+
+  /** Get and remove the oldest persistence effect from the probe */
+  def extract(): PersistenceEffect[T]
+
+  /**
+   * Get and remove the oldest persistence effect from the probe, failing if 
the
+   *  persisted object is not of the requested type
+   */
+  def expectPersistedType[S <: T: ClassTag](): PersistenceEffect[S]
+
+  /** Are there any persistence effects? */
+  def hasEffects: Boolean
+
+  /**
+   * Assert that the given object was persisted in the oldest persistence 
effect and
+   *  remove that persistence effect
+   */
+  def expectPersisted(obj: T): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tag in the 
oldest
+   *  persistence effect and remove that persistence effect.  If the 
persistence
+   *  effect has multiple tags, only one of them has to match in order for the
+   *  assertion to succeed.
+   */
+  def expectPersisted(obj: T, tag: String): PersistenceProbe[T]
+
+  /**
+   * Assert that the given object was persisted with the given tags in the 
oldest
+   *  persistence effect and remove that persistence effect.  If the 
persistence
+   *  effect has tags which are not given, the assertion fails.
+   */
+  def expectPersisted(obj: T, tags: Set[String]): PersistenceProbe[T]
+}
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
new file mode 100644
index 0000000000..d3d3737373
--- /dev/null
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentDurableStateSpec.scala
@@ -0,0 +1,251 @@
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.{ Behavior, RecipientRef }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.state.RecoveryCompleted
+import pekko.persistence.typed.state.scaladsl._
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object UnpersistentDurableStateSpec {
+  object BehaviorUnderTest {
+    sealed trait Command
+
+    case class Add(n: Int, replyTo: RecipientRef[Done]) extends Command
+    case class AddIfLessThan(toAdd: Int, ifLessThan: Int, replyTo: 
RecipientRef[Boolean]) extends Command
+    case class AddWhenAtLeast(toAdd: Int, whenAtLeast: Int, replyTo: 
RecipientRef[Done]) extends Command
+    case class NotifyIfAtLeast(n: Int, notifyTo: RecipientRef[Done], replyTo: 
RecipientRef[Boolean]) extends Command
+    case class GetRevisionNumber(replyTo: RecipientRef[Long]) extends Command
+
+    case class State(count: Int, notifyAfter: Map[Int, RecipientRef[Done]], 
nextNotifyAt: Int) {
+      def processAdd(n: Int): State = {
+        val nextCount = count + n
+
+        if (nextCount < nextNotifyAt) copy(count = nextCount)
+        else {
+          import scala.collection.mutable
+
+          val (nextNNA, nextNotifyAfter) = {
+            var lowestNotifyAt = Int.MaxValue
+            val inProgress = mutable.Map.empty[Int, RecipientRef[Done]]
+
+            notifyAfter.keysIterator.foreach { at =>
+              if (at > nextCount) {
+                lowestNotifyAt = lowestNotifyAt.min(at)
+                inProgress += (at -> notifyAfter(at))
+              }
+            }
+
+            lowestNotifyAt -> inProgress.toMap
+          }
+
+          copy(count = nextCount, notifyAfter = nextNotifyAfter, nextNotifyAt 
= nextNNA)
+        }
+      }
+
+      def addObserver(at: Int, notifyTo: RecipientRef[Done]): State = {
+        val nextNNA = nextNotifyAt.min(at)
+        val nextNotifyAfter = notifyAfter.updated(at, notifyTo)
+
+        copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA)
+      }
+    }
+
+    def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command] 
=
+      Behaviors.setup { context =>
+        context.setLoggerName(s"entity-$id")
+
+        DurableStateBehavior[Command, State](
+          persistenceId = PersistenceId.ofUniqueId(id),
+          emptyState = State(0, Map.empty, Int.MaxValue),
+          commandHandler = applyCommand(_, _, context))
+          .receiveSignal {
+            case (state, RecoveryCompleted) =>
+              context.log.debug("Recovered state for id [{}] is [{}]", id, 
state)
+              recoveryDone ! Done
+          }
+          .withTag("count")
+      }
+
+    private def applyCommand(state: State, cmd: Command, context: 
ActorContext[Command]): Effect[State] = {
+      def persistAdd[Reply](n: Int, replyTo: RecipientRef[Reply], reply: 
Reply): Effect[State] = {
+        val newState = state.processAdd(n)
+
+        Effect
+          .persist(newState)
+          .thenRun { nextState => // should be the same as newState, but...
+            state.notifyAfter.keysIterator
+              .filter { at =>
+                (at <= nextState.nextNotifyAt) && 
!nextState.notifyAfter.isDefinedAt(at)
+              }
+              .foreach { at =>
+                state.notifyAfter(at) ! Done
+              }
+
+            replyTo ! reply
+          }
+          .thenUnstashAll()
+      }
+
+      cmd match {
+        case Add(n, replyTo) => persistAdd(n, replyTo, Done)
+
+        case AddIfLessThan(toAdd, ifLessThan, replyTo) =>
+          if (state.count >= ifLessThan) {
+            context.log.info("Rejecting AddIfLessThan as count = {}", 
state.count)
+            Effect.none[State].thenRun(_ => replyTo ! false)
+          } else persistAdd(toAdd, replyTo, true)
+
+        case AddWhenAtLeast(toAdd, whenAtLeast, replyTo) =>
+          if (state.count < whenAtLeast) Effect.stash()
+          else persistAdd(toAdd, replyTo, Done)
+
+        case NotifyIfAtLeast(n, notifyTo, replyTo) =>
+          if (state.count >= n) {
+            Effect.none[State].thenRun { _ =>
+              notifyTo ! Done
+              replyTo ! true
+            }
+          } else if (state.notifyAfter.isDefinedAt(n)) {
+            Effect.none[State].thenRun(_ => replyTo ! false)
+          } else {
+            Effect.persist(state.addObserver(n, notifyTo)).thenRun(_ => 
replyTo ! true)
+          }
+
+        case GetRevisionNumber(replyTo) =>
+          Effect.none[State].thenRun(_ => replyTo ! 
DurableStateBehavior.lastSequenceNumber(context))
+      }
+    }
+  }
+}
+
+class UnpersistentDurableStateSpec extends AnyWordSpec with Matchers {
+  import UnpersistentDurableStateSpec._
+
+  import pekko.actor.testkit.typed.scaladsl._
+
+  import org.slf4j.event.Level
+
+  "Unpersistent DurableStateBehavior" must {
+    "generate a fail-fast behavior from a non-DurableStateBehavior" in {
+      val notDurableState =
+        Behaviors.receive[Any] { (context, msg) =>
+          context.log.info("Got message {}", msg)
+          Behaviors.same
+        }
+
+      val unpersistent = UnpersistentBehavior.fromDurableState[Any, 
Any](notDurableState)
+      an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit }
+      assert(!unpersistent.stateProbe.hasEffects, "should be no persistence 
effects")
+    }
+
+    "generate a Behavior from a DurableStateBehavior and process 
RecoveryCompleted" in {
+      import BehaviorUnderTest._
+
+      val recoveryDone = TestInbox[Done]()
+      val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+      // accessor-style API
+      val unpersistent = UnpersistentBehavior.fromDurableState[Command, 
State](behavior)
+      val probe = unpersistent.stateProbe
+      val testkit = unpersistent.behaviorTestKit
+
+      assert(!probe.hasEffects, "should not be persistence yet")
+      recoveryDone.expectMessage(Done)
+      val logs = testkit.logEntries()
+      logs.size shouldBe 1
+      logs.head.level shouldBe Level.DEBUG
+      logs.head.message shouldBe s"Recovered state for id [test-1] is 
[${State(0, Map.empty, Int.MaxValue)}]"
+    }
+
+    "publish state changes in response to commands" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo = TestInbox[Done]()
+
+      // and the more functional-style API
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        testkit.run(Add(1, replyTo.ref))
+        replyTo.expectMessage(Done)
+        probe.expectPersisted(State(1, Map.empty, Int.MaxValue), tag = "count")
+        assert(!testkit.hasEffects(), "should have no actor effects")
+      }
+    }
+
+    "allow a state to be injected" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val notify3 = TestInbox[Done]()
+      val initialState = State(1, Map(3 -> notify3.ref), 3)
+
+      UnpersistentBehavior.fromDurableState[Command, State](behavior, 
Some(initialState)) { (testkit, probe) =>
+        val logs = testkit.logEntries()
+
+        logs.size shouldBe 1
+        logs.head.level shouldBe Level.DEBUG
+        logs.head.message shouldBe s"Recovered state for id [test-1] is 
[$initialState]"
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!notify3.hasMessages, "no messages should be sent to notify3")
+
+        val replyTo = TestInbox[Done]()
+        testkit.run(AddWhenAtLeast(2, 2, replyTo.ref))
+        assert(!replyTo.hasMessages, "no messages should be sent now")
+        assert(!notify3.hasMessages, "no messages should be sent to notify3")
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!testkit.hasEffects(), "should be no testkit effects")
+
+        testkit.run(Add(3, TestInbox[Done]().ref))
+        replyTo.expectMessage(Done)
+        notify3.expectMessage(Done)
+        assert(!testkit.hasEffects(), "should be no testkit effects")
+        probe.drain() should contain theSameElementsInOrderAs Seq(
+          PersistenceEffect(State(4, Map.empty, Int.MaxValue), 1, 
Set("count")),
+          PersistenceEffect(State(6, Map.empty, Int.MaxValue), 2, 
Set("count")))
+      }
+    }
+
+    "stash and unstash properly" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo1 = TestInbox[Done]()
+      val add = Add(1, TestInbox[Done]().ref)
+
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        // stashes
+        testkit.run(AddWhenAtLeast(1, 1, replyTo1.ref))
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!replyTo1.hasMessages, "count is not yet 1")
+
+        // unstashes
+        testkit.run(add)
+        replyTo1.expectMessage(Done)
+        probe.drain() shouldNot be(empty)
+
+        // unstash but nothing in the stash
+        testkit.run(add)
+        assert(!replyTo1.hasMessages, "should not send again")
+        probe.drain() shouldNot be(empty)
+      }
+    }
+
+    "retrieve revision number" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+      val replyTo = TestInbox[Long]()
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        testkit.run(GetRevisionNumber(replyTo.ref))
+        (the[AssertionError] thrownBy (probe.extract())).getMessage shouldBe 
"No persistence effects in probe"
+        replyTo.expectMessage(0)
+      }
+    }
+  }
+}
diff --git 
a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
new file mode 100644
index 0000000000..d3d3737373
--- /dev/null
+++ 
b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/scaladsl/UnpersistentEventSourcedSpec.scala
@@ -0,0 +1,251 @@
+package org.apache.pekko.persistence.testkit.scaladsl
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.{ Behavior, RecipientRef }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors }
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.state.RecoveryCompleted
+import pekko.persistence.typed.state.scaladsl._
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+object UnpersistentDurableStateSpec {
+  object BehaviorUnderTest {
+    sealed trait Command
+
+    case class Add(n: Int, replyTo: RecipientRef[Done]) extends Command
+    case class AddIfLessThan(toAdd: Int, ifLessThan: Int, replyTo: 
RecipientRef[Boolean]) extends Command
+    case class AddWhenAtLeast(toAdd: Int, whenAtLeast: Int, replyTo: 
RecipientRef[Done]) extends Command
+    case class NotifyIfAtLeast(n: Int, notifyTo: RecipientRef[Done], replyTo: 
RecipientRef[Boolean]) extends Command
+    case class GetRevisionNumber(replyTo: RecipientRef[Long]) extends Command
+
+    case class State(count: Int, notifyAfter: Map[Int, RecipientRef[Done]], 
nextNotifyAt: Int) {
+      def processAdd(n: Int): State = {
+        val nextCount = count + n
+
+        if (nextCount < nextNotifyAt) copy(count = nextCount)
+        else {
+          import scala.collection.mutable
+
+          val (nextNNA, nextNotifyAfter) = {
+            var lowestNotifyAt = Int.MaxValue
+            val inProgress = mutable.Map.empty[Int, RecipientRef[Done]]
+
+            notifyAfter.keysIterator.foreach { at =>
+              if (at > nextCount) {
+                lowestNotifyAt = lowestNotifyAt.min(at)
+                inProgress += (at -> notifyAfter(at))
+              }
+            }
+
+            lowestNotifyAt -> inProgress.toMap
+          }
+
+          copy(count = nextCount, notifyAfter = nextNotifyAfter, nextNotifyAt 
= nextNNA)
+        }
+      }
+
+      def addObserver(at: Int, notifyTo: RecipientRef[Done]): State = {
+        val nextNNA = nextNotifyAt.min(at)
+        val nextNotifyAfter = notifyAfter.updated(at, notifyTo)
+
+        copy(notifyAfter = nextNotifyAfter, nextNotifyAt = nextNNA)
+      }
+    }
+
+    def apply(id: String, recoveryDone: RecipientRef[Done]): Behavior[Command] 
=
+      Behaviors.setup { context =>
+        context.setLoggerName(s"entity-$id")
+
+        DurableStateBehavior[Command, State](
+          persistenceId = PersistenceId.ofUniqueId(id),
+          emptyState = State(0, Map.empty, Int.MaxValue),
+          commandHandler = applyCommand(_, _, context))
+          .receiveSignal {
+            case (state, RecoveryCompleted) =>
+              context.log.debug("Recovered state for id [{}] is [{}]", id, 
state)
+              recoveryDone ! Done
+          }
+          .withTag("count")
+      }
+
+    private def applyCommand(state: State, cmd: Command, context: 
ActorContext[Command]): Effect[State] = {
+      def persistAdd[Reply](n: Int, replyTo: RecipientRef[Reply], reply: 
Reply): Effect[State] = {
+        val newState = state.processAdd(n)
+
+        Effect
+          .persist(newState)
+          .thenRun { nextState => // should be the same as newState, but...
+            state.notifyAfter.keysIterator
+              .filter { at =>
+                (at <= nextState.nextNotifyAt) && 
!nextState.notifyAfter.isDefinedAt(at)
+              }
+              .foreach { at =>
+                state.notifyAfter(at) ! Done
+              }
+
+            replyTo ! reply
+          }
+          .thenUnstashAll()
+      }
+
+      cmd match {
+        case Add(n, replyTo) => persistAdd(n, replyTo, Done)
+
+        case AddIfLessThan(toAdd, ifLessThan, replyTo) =>
+          if (state.count >= ifLessThan) {
+            context.log.info("Rejecting AddIfLessThan as count = {}", 
state.count)
+            Effect.none[State].thenRun(_ => replyTo ! false)
+          } else persistAdd(toAdd, replyTo, true)
+
+        case AddWhenAtLeast(toAdd, whenAtLeast, replyTo) =>
+          if (state.count < whenAtLeast) Effect.stash()
+          else persistAdd(toAdd, replyTo, Done)
+
+        case NotifyIfAtLeast(n, notifyTo, replyTo) =>
+          if (state.count >= n) {
+            Effect.none[State].thenRun { _ =>
+              notifyTo ! Done
+              replyTo ! true
+            }
+          } else if (state.notifyAfter.isDefinedAt(n)) {
+            Effect.none[State].thenRun(_ => replyTo ! false)
+          } else {
+            Effect.persist(state.addObserver(n, notifyTo)).thenRun(_ => 
replyTo ! true)
+          }
+
+        case GetRevisionNumber(replyTo) =>
+          Effect.none[State].thenRun(_ => replyTo ! 
DurableStateBehavior.lastSequenceNumber(context))
+      }
+    }
+  }
+}
+
+class UnpersistentDurableStateSpec extends AnyWordSpec with Matchers {
+  import UnpersistentDurableStateSpec._
+
+  import pekko.actor.testkit.typed.scaladsl._
+
+  import org.slf4j.event.Level
+
+  "Unpersistent DurableStateBehavior" must {
+    "generate a fail-fast behavior from a non-DurableStateBehavior" in {
+      val notDurableState =
+        Behaviors.receive[Any] { (context, msg) =>
+          context.log.info("Got message {}", msg)
+          Behaviors.same
+        }
+
+      val unpersistent = UnpersistentBehavior.fromDurableState[Any, 
Any](notDurableState)
+      an[AssertionError] shouldBe thrownBy { unpersistent.behaviorTestKit }
+      assert(!unpersistent.stateProbe.hasEffects, "should be no persistence 
effects")
+    }
+
+    "generate a Behavior from a DurableStateBehavior and process 
RecoveryCompleted" in {
+      import BehaviorUnderTest._
+
+      val recoveryDone = TestInbox[Done]()
+      val behavior = BehaviorUnderTest("test-1", recoveryDone.ref)
+
+      // accessor-style API
+      val unpersistent = UnpersistentBehavior.fromDurableState[Command, 
State](behavior)
+      val probe = unpersistent.stateProbe
+      val testkit = unpersistent.behaviorTestKit
+
+      assert(!probe.hasEffects, "should not be persistence yet")
+      recoveryDone.expectMessage(Done)
+      val logs = testkit.logEntries()
+      logs.size shouldBe 1
+      logs.head.level shouldBe Level.DEBUG
+      logs.head.message shouldBe s"Recovered state for id [test-1] is 
[${State(0, Map.empty, Int.MaxValue)}]"
+    }
+
+    "publish state changes in response to commands" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo = TestInbox[Done]()
+
+      // and the more functional-style API
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        testkit.run(Add(1, replyTo.ref))
+        replyTo.expectMessage(Done)
+        probe.expectPersisted(State(1, Map.empty, Int.MaxValue), tag = "count")
+        assert(!testkit.hasEffects(), "should have no actor effects")
+      }
+    }
+
+    "allow a state to be injected" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val notify3 = TestInbox[Done]()
+      val initialState = State(1, Map(3 -> notify3.ref), 3)
+
+      UnpersistentBehavior.fromDurableState[Command, State](behavior, 
Some(initialState)) { (testkit, probe) =>
+        val logs = testkit.logEntries()
+
+        logs.size shouldBe 1
+        logs.head.level shouldBe Level.DEBUG
+        logs.head.message shouldBe s"Recovered state for id [test-1] is 
[$initialState]"
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!notify3.hasMessages, "no messages should be sent to notify3")
+
+        val replyTo = TestInbox[Done]()
+        testkit.run(AddWhenAtLeast(2, 2, replyTo.ref))
+        assert(!replyTo.hasMessages, "no messages should be sent now")
+        assert(!notify3.hasMessages, "no messages should be sent to notify3")
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!testkit.hasEffects(), "should be no testkit effects")
+
+        testkit.run(Add(3, TestInbox[Done]().ref))
+        replyTo.expectMessage(Done)
+        notify3.expectMessage(Done)
+        assert(!testkit.hasEffects(), "should be no testkit effects")
+        probe.drain() should contain theSameElementsInOrderAs Seq(
+          PersistenceEffect(State(4, Map.empty, Int.MaxValue), 1, 
Set("count")),
+          PersistenceEffect(State(6, Map.empty, Int.MaxValue), 2, 
Set("count")))
+      }
+    }
+
+    "stash and unstash properly" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+      val replyTo1 = TestInbox[Done]()
+      val add = Add(1, TestInbox[Done]().ref)
+
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        // stashes
+        testkit.run(AddWhenAtLeast(1, 1, replyTo1.ref))
+        assert(!probe.hasEffects, "should be no persistence effect")
+        assert(!replyTo1.hasMessages, "count is not yet 1")
+
+        // unstashes
+        testkit.run(add)
+        replyTo1.expectMessage(Done)
+        probe.drain() shouldNot be(empty)
+
+        // unstash but nothing in the stash
+        testkit.run(add)
+        assert(!replyTo1.hasMessages, "should not send again")
+        probe.drain() shouldNot be(empty)
+      }
+    }
+
+    "retrieve revision number" in {
+      import BehaviorUnderTest._
+
+      val behavior = BehaviorUnderTest("test-1", TestInbox[Done]().ref)
+
+      val replyTo = TestInbox[Long]()
+      UnpersistentBehavior.fromDurableState[Command, State](behavior) { 
(testkit, probe) =>
+        testkit.run(GetRevisionNumber(replyTo.ref))
+        (the[AssertionError] thrownBy (probe.extract())).getMessage shouldBe 
"No persistence effects in probe"
+        replyTo.expectMessage(0)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to