This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c95ddb4da Support for external transport of replicated events (#2808)
3c95ddb4da is described below

commit 3c95ddb4da8bca6afbd1d2a3a1b8e5fefd3f34d4
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 19:11:42 2026 +0800

    Support for external transport of replicated events (#2808)
    
    Port the external transport feature for replicated events from upstream.
    This enables replicated events to be transported through external mechanisms
    (such as Projections) rather than only through direct replication.
    
    Key changes:
    - Add replyTo: Option[ActorRef[Done]] to PublishedEventImpl for ack support
    - Add lossyTransport marker to distinguish transport reliability semantics
    - Add externalReplication API in ReplicatedEventSourcing for entities using
      external transport (query plugin set to NoPlugin)
    - Update onPublishedEvent: non-lossy transports accept gaps in sequence
      numbers, lossy transports reject gaps to prevent data loss
    - Add handleExternalReplicatedEventPersist ack via SideEffect
    - Add ReplicationContextImpl.NoPlugin companion constant
    - Guard replication stream start with NoPlugin check in ReplayingEvents
    - Serialize/deserialize replyTo field via ActorRefResolver
    - Update ReplicatedEventSourcing.proto with replyTo field (field 6)
    - Fix missing event.persistenceId argument in warn log
    - Add MiMa binary compatibility excludes for new proto methods
    - Add tests: ack reply, non-lossy transport gap acceptance, replyTo
      round-trip serialization
    
    Upstream: akka/akka-core@3cbed9032ae3123acacbe17837e590eb2df43975
    Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
---
 .../ReplicatedShardingDirectReplicationSpec.scala  |   9 +-
 .../typed/ReplicatedEventPublishingSpec.scala      | 108 +++++-
 .../typed/ReplicationSnapshotSpec.scala            |   6 +-
 .../serialization/ReplicatedEventSourcing.java     | 380 ++++++++++++++++++---
 .../external-replication.excludes                  |  22 ++
 .../main/protobuf/ReplicatedEventSourcing.proto    |   1 +
 .../typed/internal/EventSourcedBehaviorImpl.scala  |   8 +-
 .../typed/internal/ReplayingEvents.scala           |   6 +-
 .../typed/internal/ReplicationSetup.scala          |   8 +
 .../pekko/persistence/typed/internal/Running.scala |  43 ++-
 .../typed/scaladsl/ReplicatedEventSourcing.scala   |  22 ++
 .../ReplicatedEventSourcingSerializer.scala        |  23 +-
 .../ReplicatedEventSourcingSerializerSpec.scala    |  16 +-
 13 files changed, 571 insertions(+), 81 deletions(-)

diff --git 
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
 
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
index 5187664168..e8eb44dc7b 100644
--- 
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
+++ 
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
@@ -54,7 +54,8 @@ class ReplicatedShardingDirectReplicationSpec extends 
ScalaTestWithActorTestKit
         1L,
         "event",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), 
VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), 
VersionVector.empty)),
+        None)
       system.eventStream ! EventStream.Publish(event)
 
       replicaBProbe.receiveMessage().message should equal(event)
@@ -80,7 +81,8 @@ class ReplicatedShardingDirectReplicationSpec extends 
ScalaTestWithActorTestKit
         1L,
         "event",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), 
VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), 
VersionVector.empty)),
+        None)
       system.eventStream ! EventStream.Publish(event)
 
       replicaAProbe.expectNoMessage()
@@ -104,7 +106,8 @@ class ReplicatedShardingDirectReplicationSpec extends 
ScalaTestWithActorTestKit
         1L,
         "event",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), 
VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), 
VersionVector.empty)),
+        None)
       system.eventStream ! EventStream.Publish(event)
 
       replicaAProbe.expectNoMessage()
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
index 7faab3f63c..024ab6ddcd 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
@@ -65,6 +65,31 @@ object ReplicatedEventPublishingSpec {
               },
             (state, string) => state + string))
       }
+
+    def externalReplication(entityId: String, replicaId: ReplicaId, 
allReplicas: Set[ReplicaId]): Behavior[Command] =
+      Behaviors.setup { ctx =>
+        ReplicatedEventSourcing.externalReplication(ReplicationId(EntityType, 
entityId, replicaId), allReplicas)(
+          replicationContext =>
+            EventSourcedBehavior[Command, String, Set[String]](
+              replicationContext.persistenceId,
+              Set.empty,
+              (state, command) =>
+                command match {
+                  case Add(string, replyTo) =>
+                    ctx.log.debug("Persisting [{}]", string)
+                    Effect.persist(string).thenRun { _ =>
+                      ctx.log.debug("Ack:ing [{}]", string)
+                      replyTo ! Done
+                    }
+                  case Get(replyTo) =>
+                    replyTo ! state
+                    Effect.none
+                  case Stop =>
+                    Effect.stop()
+                  case unexpected => throw new RuntimeException(s"Unexpected: 
$unexpected")
+                },
+              (state, string) => state + string))
+      }
   }
 }
 
@@ -99,7 +124,8 @@ class ReplicatedEventPublishingSpec
         1L,
         "two",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
       actor ! MyReplicatedBehavior.Add("three", probe.ref)
       probe.expectMessage(Done)
 
@@ -107,7 +133,38 @@ class ReplicatedEventPublishingSpec
       probe.expectMessage(Set("one", "two", "three"))
     }
 
-    "ignore a published event from a replica is received but the sequence 
number is unexpected" in {
+    "reply with an ack for a published event if requested" in {
+      val id = nextEntityId()
+      val actor = spawn(MyReplicatedBehavior.externalReplication(id, DCA, 
Set(DCA, DCB)))
+      val probe = createTestProbe[Any]()
+
+      val ackProbe = createTestProbe[Done]()
+      val persistenceId = ReplicationId(EntityType, id, DCB).persistenceId
+      // a published event from another replica
+      val publishedEvent = internal.PublishedEventImpl(
+        persistenceId,
+        1L,
+        "one",
+        System.currentTimeMillis(),
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        Some(ackProbe.ref))
+      actor.asInstanceOf[ActorRef[Any]] ! publishedEvent
+      ackProbe.receiveMessage()
+
+      actor ! MyReplicatedBehavior.Get(probe.ref)
+      probe.expectMessage(Set("one"))
+
+      // also if we publish it again, we ack since we have seen and persisted 
it
+      // even if we deduplicate and don't write anything
+      actor.asInstanceOf[ActorRef[Any]] ! publishedEvent
+      ackProbe.receiveMessage()
+
+      // nothing changed
+      actor ! MyReplicatedBehavior.Get(probe.ref)
+      probe.expectMessage(Set("one"))
+    }
+
+    "ignore a published event from a replica is received over a lossy 
transport when there is a gap in sequence numbers" in {
       val id = nextEntityId()
       val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
       val probe = createTestProbe[Any]()
@@ -120,7 +177,8 @@ class ReplicatedEventPublishingSpec
         2L, // missing 1L
         "two",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
       actor ! MyReplicatedBehavior.Add("three", probe.ref)
       probe.expectMessage(Done)
 
@@ -128,6 +186,32 @@ class ReplicatedEventPublishingSpec
       probe.expectMessage(Set("one", "three"))
     }
 
+    "accept a published event from a replica is received over a non-lossy 
transport when there is a gap in sequence numbers" in {
+      // scenario:
+      // DCB saw a replicated event from DCA first, so already used 1 as seq 
nr for that
+      // then does a write of its own that is now replicating over to DCA - 
DCA has not seen
+      // DCB -> 1 but should still accept the update since the transport is 
not lossy
+      val id = nextEntityId()
+      val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
+      val probe = createTestProbe[Any]()
+      actor ! MyReplicatedBehavior.Add("one", probe.ref)
+      probe.expectMessage(Done)
+
+      actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
+        ReplicationId(EntityType, id, DCB).persistenceId,
+        2L, // missing 1L
+        "two",
+        System.currentTimeMillis(),
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        Some(probe.ref))
+      probe.expectMessage(Done)
+      actor ! MyReplicatedBehavior.Add("three", probe.ref)
+      probe.expectMessage(Done)
+
+      actor ! MyReplicatedBehavior.Get(probe.ref)
+      probe.expectMessage(Set("one", "two", "three"))
+    }
+
     "ignore a published event from an unknown replica" in {
       val id = nextEntityId()
       val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
@@ -141,7 +225,8 @@ class ReplicatedEventPublishingSpec
         1L,
         "two",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty)),
+        None)
       actor ! MyReplicatedBehavior.Add("three", probe.ref)
       probe.expectMessage(Done)
 
@@ -162,14 +247,16 @@ class ReplicatedEventPublishingSpec
         1L,
         "two",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
       // simulate another published event from that replica
       actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
         ReplicationId(EntityType, id, DCB).persistenceId,
         1L,
         "two-again", // ofc this would be the same in the real world, 
different just so we can detect
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
 
       actor ! MyReplicatedBehavior.Add("three", probe.ref)
       probe.expectMessage(Done)
@@ -201,7 +288,8 @@ class ReplicatedEventPublishingSpec
         1L,
         "two",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
 
       incarnation2 ! MyReplicatedBehavior.Add("three", probe.ref)
       probe.expectMessage(Done)
@@ -224,7 +312,8 @@ class ReplicatedEventPublishingSpec
         1L,
         "two",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
 
       incarnationA1 ! MyReplicatedBehavior.Stop
       probe.expectTerminated(incarnationA1)
@@ -237,7 +326,8 @@ class ReplicatedEventPublishingSpec
         2L,
         "three",
         System.currentTimeMillis(),
-        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+        Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+        None)
 
       incarnationA2 ! MyReplicatedBehavior.Add("four", probe.ref)
       probe.expectMessage(Done)
diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
index 2ef3da3f59..7165af9d39 100644
--- 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
@@ -97,7 +97,8 @@ class ReplicationSnapshotSpec
           1L,
           "two-again",
           System.currentTimeMillis(),
-          Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)))
+          Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)),
+          None)
 
         // r2 should now filter out that event if it receives it again
         r2EventProbe.expectNoMessage()
@@ -111,7 +112,8 @@ class ReplicationSnapshotSpec
           1L,
           "two-again",
           System.currentTimeMillis(),
-          Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)))
+          Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)),
+          None)
         r2EventProbe.expectNoMessage()
 
         val stateProbe = createTestProbe[State]()
diff --git 
a/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
 
b/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
index 3a14a13e8c..93018340e8 100644
--- 
a/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
+++ 
b/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
@@ -14,7 +14,7 @@
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // NO CHECKED-IN PROTOBUF GENCODE
 // source: ReplicatedEventSourcing.proto
-// Protobuf Java Version: 4.33.0
+// Protobuf Java Version: 4.34.0
 
 package org.apache.pekko.persistence.typed.serialization;
 
@@ -27,7 +27,7 @@ public final class ReplicatedEventSourcing
     
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
         
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
         /* major= */ 4,
-        /* minor= */ 33,
+        /* minor= */ 34,
         /* patch= */ 0,
         /* suffix= */ "",
         "ReplicatedEventSourcing");
@@ -55,7 +55,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "ORSetDeltaOp");
@@ -116,7 +116,7 @@ public final class ReplicatedEventSourcing
 
     public final 
org.apache.pekko.protobufv3.internal.Descriptors.EnumValueDescriptor
         getValueDescriptor() {
-      return getDescriptor().getValues().get(ordinal());
+      return getDescriptor().getValue(ordinal());
     }
 
     public final 
org.apache.pekko.protobufv3.internal.Descriptors.EnumDescriptor
@@ -127,8 +127,7 @@ public final class ReplicatedEventSourcing
     public static 
org.apache.pekko.protobufv3.internal.Descriptors.EnumDescriptor getDescriptor() 
{
       return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
           .getDescriptor()
-          .getEnumTypes()
-          .get(0);
+          .getEnumType(0);
     }
 
     private static final ORSetDeltaOp[] VALUES = values();
@@ -181,7 +180,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "Counter");
@@ -202,6 +201,12 @@ public final class ReplicatedEventSourcing
           .internal_static_Counter_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_Counter_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -734,7 +739,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "CounterUpdate");
@@ -756,6 +761,12 @@ public final class ReplicatedEventSourcing
           .internal_static_CounterUpdate_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_CounterUpdate_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -1447,7 +1458,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "ORSet");
@@ -1473,6 +1484,12 @@ public final class ReplicatedEventSourcing
           .internal_static_ORSet_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_ORSet_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -1865,10 +1882,15 @@ public final class ReplicatedEventSourcing
             
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
                 2, getVvector());
       }
-      for (int i = 0; i < dots_.size(); i++) {
-        size +=
-            
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
-                3, dots_.get(i));
+
+      {
+        final int count = dots_.size();
+        for (int i = 0; i < count; i++) {
+          size +=
+              
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+                  dots_.get(i));
+        }
+        size += 1 * count;
       }
       {
         int dataSize = 0;
@@ -1910,10 +1932,15 @@ public final class ReplicatedEventSourcing
         }
         longElementsMemoizedSerializedSize = dataSize;
       }
-      for (int i = 0; i < otherElements_.size(); i++) {
-        size +=
-            
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
-                7, otherElements_.get(i));
+
+      {
+        final int count = otherElements_.size();
+        for (int i = 0; i < count; i++) {
+          size +=
+              
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+                  otherElements_.get(i));
+        }
+        size += 1 * count;
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSize = size;
@@ -3709,7 +3736,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "ORSetDeltaGroup");
@@ -3731,6 +3758,12 @@ public final class ReplicatedEventSourcing
           .internal_static_ORSetDeltaGroup_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_ORSetDeltaGroup_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -3794,7 +3827,7 @@ public final class ReplicatedEventSourcing
         
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
             
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
             /* major= */ 4,
-            /* minor= */ 33,
+            /* minor= */ 34,
             /* patch= */ 0,
             /* suffix= */ "",
             "Entry");
@@ -3815,6 +3848,12 @@ public final class ReplicatedEventSourcing
             .internal_static_ORSetDeltaGroup_Entry_descriptor;
       }
 
+      @java.lang.Override
+      public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+        return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+            .internal_static_ORSetDeltaGroup_Entry_descriptor;
+      }
+
       @java.lang.Override
       protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
           internalGetFieldAccessorTable() {
@@ -4701,10 +4740,15 @@ public final class ReplicatedEventSourcing
       if (size != -1) return size;
 
       size = 0;
-      for (int i = 0; i < entries_.size(); i++) {
-        size +=
-            
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
-                1, entries_.get(i));
+
+      {
+        final int count = entries_.size();
+        for (int i = 0; i < count; i++) {
+          size +=
+              
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+                  entries_.get(i));
+        }
+        size += 1 * count;
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSize = size;
@@ -5486,7 +5530,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "VersionVector");
@@ -5508,6 +5552,12 @@ public final class ReplicatedEventSourcing
           .internal_static_VersionVector_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_VersionVector_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -5572,7 +5622,7 @@ public final class ReplicatedEventSourcing
         
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
             
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
             /* major= */ 4,
-            /* minor= */ 33,
+            /* minor= */ 34,
             /* patch= */ 0,
             /* suffix= */ "",
             "Entry");
@@ -5593,6 +5643,12 @@ public final class ReplicatedEventSourcing
             .internal_static_VersionVector_Entry_descriptor;
       }
 
+      @java.lang.Override
+      public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+        return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+            .internal_static_VersionVector_Entry_descriptor;
+      }
+
       @java.lang.Override
       protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
           internalGetFieldAccessorTable() {
@@ -6390,10 +6446,15 @@ public final class ReplicatedEventSourcing
       if (size != -1) return size;
 
       size = 0;
-      for (int i = 0; i < entries_.size(); i++) {
-        size +=
-            
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
-                1, entries_.get(i));
+
+      {
+        final int count = entries_.size();
+        for (int i = 0; i < count; i++) {
+          size +=
+              
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+                  entries_.get(i));
+        }
+        size += 1 * count;
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSize = size;
@@ -7212,7 +7273,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "ReplicatedEventMetadata");
@@ -7234,6 +7295,12 @@ public final class ReplicatedEventSourcing
           .internal_static_ReplicatedEventMetadata_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_ReplicatedEventMetadata_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -8377,7 +8444,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "ReplicatedSnapshotMetadata");
@@ -8399,6 +8466,12 @@ public final class ReplicatedEventSourcing
           .internal_static_ReplicatedSnapshotMetadata_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_ReplicatedSnapshotMetadata_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -8463,7 +8536,7 @@ public final class ReplicatedEventSourcing
         
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
             
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
             /* major= */ 4,
-            /* minor= */ 33,
+            /* minor= */ 34,
             /* patch= */ 0,
             /* suffix= */ "",
             "Seen");
@@ -8484,6 +8557,12 @@ public final class ReplicatedEventSourcing
             .internal_static_ReplicatedSnapshotMetadata_Seen_descriptor;
       }
 
+      @java.lang.Override
+      public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+        return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+            .internal_static_ReplicatedSnapshotMetadata_Seen_descriptor;
+      }
+
       @java.lang.Override
       protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
           internalGetFieldAccessorTable() {
@@ -9342,10 +9421,15 @@ public final class ReplicatedEventSourcing
             
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
                 1, getVersion());
       }
-      for (int i = 0; i < seenPerReplica_.size(); i++) {
-        size +=
-            
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
-                2, seenPerReplica_.get(i));
+
+      {
+        final int count = seenPerReplica_.size();
+        for (int i = 0; i < count; i++) {
+          size +=
+              
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+                  seenPerReplica_.get(i));
+        }
+        size += 1 * count;
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSize = size;
@@ -10352,7 +10436,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "ReplicatedPublishedEventMetaData");
@@ -10374,6 +10458,12 @@ public final class ReplicatedEventSourcing
           .internal_static_ReplicatedPublishedEventMetaData_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_ReplicatedPublishedEventMetaData_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -11318,6 +11408,27 @@ public final class ReplicatedEventSourcing
     org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
             .ReplicatedPublishedEventMetaDataOrBuilder
         getMetadataOrBuilder();
+
+    /**
+     * <code>optional string replyTo = 6;</code>
+     *
+     * @return Whether the replyTo field is set.
+     */
+    boolean hasReplyTo();
+
+    /**
+     * <code>optional string replyTo = 6;</code>
+     *
+     * @return The replyTo.
+     */
+    java.lang.String getReplyTo();
+
+    /**
+     * <code>optional string replyTo = 6;</code>
+     *
+     * @return The bytes for replyTo.
+     */
+    org.apache.pekko.protobufv3.internal.ByteString getReplyToBytes();
   }
 
   /** Protobuf type {@code PublishedEvent} */
@@ -11332,7 +11443,7 @@ public final class ReplicatedEventSourcing
       
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
           
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
           /* major= */ 4,
-          /* minor= */ 33,
+          /* minor= */ 34,
           /* patch= */ 0,
           /* suffix= */ "",
           "PublishedEvent");
@@ -11346,6 +11457,7 @@ public final class ReplicatedEventSourcing
 
     private PublishedEvent() {
       persistenceId_ = "";
+      replyTo_ = "";
     }
 
     public static final 
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
@@ -11354,6 +11466,12 @@ public final class ReplicatedEventSourcing
           .internal_static_PublishedEvent_descriptor;
     }
 
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor 
getDescriptorForType() {
+      return 
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+          .internal_static_PublishedEvent_descriptor;
+    }
+
     @java.lang.Override
     protected 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
         internalGetFieldAccessorTable() {
@@ -11541,6 +11659,60 @@ public final class ReplicatedEventSourcing
           : metadata_;
     }
 
+    public static final int REPLYTO_FIELD_NUMBER = 6;
+
+    @SuppressWarnings("serial")
+    private volatile java.lang.Object replyTo_ = "";
+
+    /**
+     * <code>optional string replyTo = 6;</code>
+     *
+     * @return Whether the replyTo field is set.
+     */
+    @java.lang.Override
+    public boolean hasReplyTo() {
+      return ((bitField0_ & 0x00000020) != 0);
+    }
+
+    /**
+     * <code>optional string replyTo = 6;</code>
+     *
+     * @return The replyTo.
+     */
+    @java.lang.Override
+    public java.lang.String getReplyTo() {
+      java.lang.Object ref = replyTo_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        org.apache.pekko.protobufv3.internal.ByteString bs =
+            (org.apache.pekko.protobufv3.internal.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          replyTo_ = s;
+        }
+        return s;
+      }
+    }
+
+    /**
+     * <code>optional string replyTo = 6;</code>
+     *
+     * @return The bytes for replyTo.
+     */
+    @java.lang.Override
+    public org.apache.pekko.protobufv3.internal.ByteString getReplyToBytes() {
+      java.lang.Object ref = replyTo_;
+      if (ref instanceof java.lang.String) {
+        org.apache.pekko.protobufv3.internal.ByteString b =
+            
org.apache.pekko.protobufv3.internal.ByteString.copyFromUtf8((java.lang.String) 
ref);
+        replyTo_ = b;
+        return b;
+      } else {
+        return (org.apache.pekko.protobufv3.internal.ByteString) ref;
+      }
+    }
+
     private byte memoizedIsInitialized = -1;
 
     @java.lang.Override
@@ -11584,6 +11756,9 @@ public final class ReplicatedEventSourcing
       if (((bitField0_ & 0x00000010) != 0)) {
         output.writeMessage(5, getMetadata());
       }
+      if (((bitField0_ & 0x00000020) != 0)) {
+        
org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 6, 
replyTo_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -11616,6 +11791,10 @@ public final class ReplicatedEventSourcing
             
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
                 5, getMetadata());
       }
+      if (((bitField0_ & 0x00000020) != 0)) {
+        size +=
+            
org.apache.pekko.protobufv3.internal.GeneratedMessage.computeStringSize(6, 
replyTo_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSize = size;
       return size;
@@ -11658,6 +11837,10 @@ public final class ReplicatedEventSourcing
       if (hasMetadata()) {
         if (!getMetadata().equals(other.getMetadata())) return false;
       }
+      if (hasReplyTo() != other.hasReplyTo()) return false;
+      if (hasReplyTo()) {
+        if (!getReplyTo().equals(other.getReplyTo())) return false;
+      }
       if (!getUnknownFields().equals(other.getUnknownFields())) return false;
       return true;
     }
@@ -11690,6 +11873,10 @@ public final class ReplicatedEventSourcing
         hash = (37 * hash) + METADATA_FIELD_NUMBER;
         hash = (53 * hash) + getMetadata().hashCode();
       }
+      if (hasReplyTo()) {
+        hash = (37 * hash) + REPLYTO_FIELD_NUMBER;
+        hash = (53 * hash) + getReplyTo().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -11882,6 +12069,7 @@ public final class ReplicatedEventSourcing
           metadataBuilder_.dispose();
           metadataBuilder_ = null;
         }
+        replyTo_ = "";
         return this;
       }
 
@@ -11948,6 +12136,10 @@ public final class ReplicatedEventSourcing
           result.metadata_ = metadataBuilder_ == null ? metadata_ : 
metadataBuilder_.build();
           to_bitField0_ |= 0x00000010;
         }
+        if (((from_bitField0_ & 0x00000020) != 0)) {
+          result.replyTo_ = replyTo_;
+          to_bitField0_ |= 0x00000020;
+        }
         result.bitField0_ |= to_bitField0_;
       }
 
@@ -11990,6 +12182,11 @@ public final class ReplicatedEventSourcing
         if (other.hasMetadata()) {
           mergeMetadata(other.getMetadata());
         }
+        if (other.hasReplyTo()) {
+          replyTo_ = other.replyTo_;
+          bitField0_ |= 0x00000020;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         onChanged();
         return this;
@@ -12058,6 +12255,12 @@ public final class ReplicatedEventSourcing
                   bitField0_ |= 0x00000010;
                   break;
                 } // case 42
+              case 50:
+                {
+                  replyTo_ = input.readBytes();
+                  bitField0_ |= 0x00000020;
+                  break;
+                } // case 50
               default:
                 {
                   if (!super.parseUnknownField(input, extensionRegistry, tag)) 
{
@@ -12549,6 +12752,98 @@ public final class ReplicatedEventSourcing
         return metadataBuilder_;
       }
 
+      private java.lang.Object replyTo_ = "";
+
+      /**
+       * <code>optional string replyTo = 6;</code>
+       *
+       * @return Whether the replyTo field is set.
+       */
+      public boolean hasReplyTo() {
+        return ((bitField0_ & 0x00000020) != 0);
+      }
+
+      /**
+       * <code>optional string replyTo = 6;</code>
+       *
+       * @return The replyTo.
+       */
+      public java.lang.String getReplyTo() {
+        java.lang.Object ref = replyTo_;
+        if (!(ref instanceof java.lang.String)) {
+          org.apache.pekko.protobufv3.internal.ByteString bs =
+              (org.apache.pekko.protobufv3.internal.ByteString) ref;
+          java.lang.String s = bs.toStringUtf8();
+          if (bs.isValidUtf8()) {
+            replyTo_ = s;
+          }
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+
+      /**
+       * <code>optional string replyTo = 6;</code>
+       *
+       * @return The bytes for replyTo.
+       */
+      public org.apache.pekko.protobufv3.internal.ByteString getReplyToBytes() 
{
+        java.lang.Object ref = replyTo_;
+        if (ref instanceof String) {
+          org.apache.pekko.protobufv3.internal.ByteString b =
+              
org.apache.pekko.protobufv3.internal.ByteString.copyFromUtf8((java.lang.String) 
ref);
+          replyTo_ = b;
+          return b;
+        } else {
+          return (org.apache.pekko.protobufv3.internal.ByteString) ref;
+        }
+      }
+
+      /**
+       * <code>optional string replyTo = 6;</code>
+       *
+       * @param value The replyTo to set.
+       * @return This builder for chaining.
+       */
+      public Builder setReplyTo(java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        replyTo_ = value;
+        bitField0_ |= 0x00000020;
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>optional string replyTo = 6;</code>
+       *
+       * @return This builder for chaining.
+       */
+      public Builder clearReplyTo() {
+        replyTo_ = getDefaultInstance().getReplyTo();
+        bitField0_ = (bitField0_ & ~0x00000020);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>optional string replyTo = 6;</code>
+       *
+       * @param value The bytes for replyTo to set.
+       * @return This builder for chaining.
+       */
+      public Builder 
setReplyToBytes(org.apache.pekko.protobufv3.internal.ByteString value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        replyTo_ = value;
+        bitField0_ |= 0x00000020;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PublishedEvent)
     }
 
@@ -12661,7 +12956,7 @@ public final class ReplicatedEventSourcing
     return descriptor;
   }
 
-  private static 
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor descriptor;
+  private static final 
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor descriptor;
 
   static {
     java.lang.String[] descriptorData = {
@@ -12703,14 +12998,15 @@ public final class ReplicatedEventSourcing
           + "sequenceNr\030\002 \002(\003\"\\\n"
           + " ReplicatedPublishedEventMetaData\022\021\n"
           + "\treplicaId\030\001 \001(\t\022%\n\r"
-          + "versionVector\030\002 \001(\0132\016.VersionVector\"\236\001\n"
+          + "versionVector\030\002 \001(\0132\016.VersionVector\"\257\001\n"
           + "\016PublishedEvent\022\025\n"
           + "\r"
           + "persistenceId\030\001 \001(\t\022\022\n\n"
           + "sequenceNr\030\002 \001(\003\022\031\n"
           + "\007payload\030\003 \001(\0132\010.Payload\022\021\n"
           + "\ttimestamp\030\004 \001(\003\0223\n"
-          + "\010metadata\030\005 
\001(\0132!.ReplicatedPublishedEventMetaData*-\n"
+          + "\010metadata\030\005 
\001(\0132!.ReplicatedPublishedEventMetaData\022\017\n"
+          + "\007replyTo\030\006 \001(\t*-\n"
           + "\014ORSetDeltaOp\022\007\n"
           + "\003Add\020\000\022\n\n"
           + "\006Remove\020\001\022\010\n"
@@ -12815,7 +13111,7 @@ public final class ReplicatedEventSourcing
         new 
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
             internal_static_PublishedEvent_descriptor,
             new java.lang.String[] {
-              "PersistenceId", "SequenceNr", "Payload", "Timestamp", 
"Metadata",
+              "PersistenceId", "SequenceNr", "Payload", "Timestamp", 
"Metadata", "ReplyTo",
             });
     descriptor.resolveAllFeaturesImmutable();
     org.apache.pekko.remote.ContainerFormats.getDescriptor();
diff --git 
a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/external-replication.excludes
 
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/external-replication.excludes
new file mode 100644
index 0000000000..dacffd5b16
--- /dev/null
+++ 
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/external-replication.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# External transport of replicated events (port of akka/akka-core@3cbed9032a)
+# New replyTo field added to PublishedEvent protobuf message
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing#PublishedEventOrBuilder.hasReplyTo")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing#PublishedEventOrBuilder.getReplyTo")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing#PublishedEventOrBuilder.getReplyToBytes")
diff --git a/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto 
b/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
index 4b05c44ec2..8328a40a28 100644
--- a/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
+++ b/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
@@ -85,4 +85,5 @@ message PublishedEvent {
   optional Payload payload = 3;
   optional int64 timestamp = 4;
   optional ReplicatedPublishedEventMetaData metadata = 5;
+  optional string replyTo = 6;
 }
\ No newline at end of file
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index ae6bb6f221..0aa0df8bed 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.annotation.nowarn
 
 import org.apache.pekko
+import pekko.Done
 import pekko.actor.typed
 import pekko.actor.typed.ActorRef
 import pekko.actor.typed.BackoffSupervisorStrategy
@@ -385,13 +386,14 @@ final class ReplicatedPublishedEventMetaData(val 
replicaId: ReplicaId, private[p
 /**
  * INTERNAL API
  */
-@InternalApi
+@InternalStableApi
 private[pekko] final case class PublishedEventImpl(
     persistenceId: PersistenceId,
     sequenceNumber: Long,
     payload: Any,
     timestamp: Long,
-    replicatedMetaData: Option[ReplicatedPublishedEventMetaData])
+    replicatedMetaData: Option[ReplicatedPublishedEventMetaData],
+    replyTo: Option[ActorRef[Done]])
     extends PublishedEvent
     with InternalProtocol {
   import scala.jdk.OptionConverters._
@@ -411,5 +413,7 @@ private[pekko] final case class PublishedEventImpl(
     case _                => this
   }
 
+  def lossyTransport: Boolean = replyTo.isEmpty
+
   override def getReplicatedMetaData: 
Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.toJava
 }
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
index 9dce502153..74fc40f86c 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
@@ -304,8 +304,10 @@ private[pekko] final class ReplayingEvents[C, E, S](
           replicationControl = Map.empty)
         val running = new 
Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
         val initialRunningState = setup.replication match {
-          case Some(replication) => startReplicationStream(setup, 
runningState, replication)
-          case None              => runningState
+          case Some(replication)
+              if replication.allReplicasAndQueryPlugins.values.forall(_ != 
ReplicationContextImpl.NoPlugin) =>
+            startReplicationStream(setup, runningState, replication)
+          case _ => runningState
         }
         setup.retention match {
           case criteria: SnapshotCountRetentionCriteriaImpl if 
criteria.snapshotEveryNEvents <= state.eventsReplayed =>
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
index 5c4f00016b..4bbce5ef25 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
@@ -23,6 +23,14 @@ import pekko.persistence.typed.ReplicationId
 import pekko.util.OptionVal
 import pekko.util.WallClock
 
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object ReplicationContextImpl {
+  val NoPlugin = "no-plugin"
+}
+
 /**
  * INTERNAL API
  */
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index 533b1b56c0..bb983d05a2 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -361,7 +361,7 @@ private[pekko] object Running {
           "Saving event [{}] from [{}] as first time",
           envelope.event.originSequenceNr,
           envelope.event.originReplica)
-        handleExternalReplicatedEventPersist(replication, envelope.event)
+        handleExternalReplicatedEventPersist(replication, envelope.event, None)
       } else {
         setup.internalLogger.debug(
           "Filtering event [{}] from [{}] as it was already seen",
@@ -382,7 +382,9 @@ private[pekko] object Running {
         case Some(replication) =>
           event.replicatedMetaData match {
             case None =>
-              setup.internalLogger.warn("Received published event for [{}] but 
with no replicated metadata, dropping")
+              setup.internalLogger.warn(
+                "Received published event for [{}] but with no replicated 
metadata, dropping",
+                event.persistenceId)
               this
             case Some(replicatedEventMetaData) =>
               onPublishedEvent(state, replication, replicatedEventMetaData, 
event)
@@ -409,6 +411,7 @@ private[pekko] object Running {
             "Ignoring published replicated event with seqNr [{}] from our own 
replica id [{}]",
             event.sequenceNumber,
             originReplicaId)
+        event.replyTo.foreach(_ ! pekko.Done) // probably won't happen
         this
       } else if (!replication.allReplicas.contains(originReplicaId)) {
         log.warnN(
@@ -417,18 +420,20 @@ private[pekko] object Running {
           replication.allReplicas.mkString(", "))
         this
       } else {
-        val expectedSequenceNumber = 
state.seenPerReplica.getOrElse(originReplicaId, 0L) + 1
-        if (expectedSequenceNumber > event.sequenceNumber) {
-          // already seen
+        val seenSequenceNr = state.seenPerReplica.getOrElse(originReplicaId, 
0L)
+        if (seenSequenceNr >= event.sequenceNumber) {
+          // already seen/deduplication
           if (log.isDebugEnabled)
             log.debugN(
-              "Ignoring published replicated event with seqNr [{}] from 
replica [{}] because it was already seen ([{}])",
+              "Ignoring published replicated event with seqNr [{}] from 
replica [{}] because it was already seen (version: {})",
               event.sequenceNumber,
               originReplicaId,
-              expectedSequenceNumber)
+              state.seenPerReplica)
+          event.replyTo.foreach(_ ! pekko.Done)
           this
-        } else if (expectedSequenceNumber != event.sequenceNumber) {
-          // gap in sequence numbers (message lost or query and direct 
replication out of sync, should heal up by itself
+        } else if (event.lossyTransport && event.sequenceNumber != 
(seenSequenceNr + 1)) {
+          // Lossy transport/opportunistic replication cannot allow gaps in 
sequence
+          // numbers (message lost or query and direct replication out of 
sync, should heal up by itself
           // once the query catches up)
           if (log.isDebugEnabled) {
             log.debugN(
@@ -436,7 +441,7 @@ private[pekko] object Running {
               "because expected replication seqNr was [{}] ",
               event.sequenceNumber,
               originReplicaId,
-              expectedSequenceNumber)
+              seenSequenceNr + 1)
           }
           this
         } else {
@@ -458,7 +463,8 @@ private[pekko] object Running {
               event.event.asInstanceOf[E],
               originReplicaId,
               event.sequenceNumber,
-              replicatedMetadata.version))
+              replicatedMetadata.version),
+            event.replyTo)
         }
 
       }
@@ -477,7 +483,8 @@ private[pekko] object Running {
 
     private def handleExternalReplicatedEventPersist(
         replication: ReplicationSetup,
-        event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
+        event: ReplicatedEvent[E],
+        ackToOnPersisted: Option[pekko.actor.typed.ActorRef[pekko.Done]]): 
Behavior[InternalProtocol] = {
       _currentSequenceNumber = state.seqNr + 1
       val isConcurrent: Boolean = event.originVersion <> state.version
       val updatedVersion = event.originVersion.merge(state.version)
@@ -499,6 +506,14 @@ private[pekko] object Running {
 
       replication.clearContext()
 
+      val sideEffects = ackToOnPersisted match {
+        case None      => Nil
+        case Some(ref) =>
+          SideEffect[S] { _ =>
+            ref ! pekko.Done
+          } :: Nil
+      }
+
       val newState2: RunningState[S, C] = internalPersist(
         setup.context,
         null,
@@ -516,7 +531,7 @@ private[pekko] object Running {
         numberOfEvents = 1,
         shouldSnapshotAfterPersist,
         shouldPublish = false,
-        Nil)
+        sideEffects)
     }
 
     private def handleEventPersist(
@@ -789,7 +804,7 @@ private[pekko] object Running {
           val meta = setup.replication.map(replication =>
             new ReplicatedPublishedEventMetaData(replication.replicaId, 
state.version))
           context.system.eventStream ! EventStream.Publish(
-            PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, 
p.timestamp, meta))
+            PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, 
p.timestamp, meta, None))
         }
 
         // only once all things are applied we can revert back
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
index 400c4eec02..bed846ac93 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.persistence.typed.scaladsl
 
 import org.apache.pekko
 import pekko.annotation.DoNotInherit
+import pekko.annotation.InternalStableApi
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.ReplicaId
 import pekko.persistence.typed.ReplicationId
@@ -124,4 +125,25 @@ object ReplicatedEventSourcing {
     eventSourcedBehaviorFactory(context).withReplication(context)
   }
 
+  /**
+   * INTERNAL API
+   *
+   * Initialize a replicated event sourced behavior.
+   *
+   * Events from each replica for the same entityId need to be passed to it by 
an external stream.
+   * Care must be taken to handle events in any order as events can happen 
concurrently at different replicas.
+   *
+   * Using a replicated event sourced behavior means there is no longer the 
single writer guarantee.
+   *
+   * The journal plugin id for the entity itself can be configured using 
withJournalPluginId after creation.
+   */
+  @InternalStableApi
+  private[pekko] def externalReplication[Command, Event, State](
+      replicationId: ReplicationId,
+      allReplicas: Set[ReplicaId])(
+      eventSourcedBehaviorFactory: ReplicationContext => 
EventSourcedBehavior[Command, Event, State])
+      : EventSourcedBehavior[Command, Event, State] = {
+    val context = new ReplicationContextImpl(replicationId, allReplicas.map(_ 
-> ReplicationContextImpl.NoPlugin).toMap)
+    eventSourcedBehaviorFactory(context).withReplication(context)
+  }
 }
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
index 18338f09ba..c3926dbea5 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
@@ -23,6 +23,8 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.pekko
 import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorRefResolver
+import pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps
 import pekko.annotation.InternalApi
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.ReplicaId
@@ -74,6 +76,9 @@ import pekko.serialization.{ BaseSerializer, 
SerializerWithStringManifest }
     with BaseSerializer {
 
   private val wrappedSupport = new WrappedPayloadSupport(system)
+  // lazy because Serializers are initialized early on. `toTyped` might then 
try to
+  // initialize the classic ActorSystemAdapter extension.
+  private lazy val resolver = ActorRefResolver(system.toTyped)
 
   private val CrdtCounterManifest = "AA"
   private val CrdtCounterUpdatedManifest = "AB"
@@ -163,9 +168,7 @@ import pekko.serialization.{ BaseSerializer, 
SerializerWithStringManifest }
       .setPayload(wrappedSupport.payloadBuilder(impl.payload))
       .setTimestamp(impl.timestamp)
 
-    (impl.replicatedMetaData match {
-      case None =>
-        builder
+    impl.replicatedMetaData match {
       case Some(m) =>
         builder.setMetadata(
           ReplicatedEventSourcing.ReplicatedPublishedEventMetaData
@@ -173,7 +176,15 @@ import pekko.serialization.{ BaseSerializer, 
SerializerWithStringManifest }
             .setReplicaId(m.replicaId.id)
             .setVersionVector(versionVectorToProto(m.version))
             .build())
-    }).build().toByteArray
+      case None =>
+    }
+
+    impl.replyTo match {
+      case Some(ref) => builder.setReplyTo(resolver.toSerializationFormat(ref))
+      case None      =>
+    }
+
+    builder.build().toByteArray
   }
 
   def publishedEventFromBinary(bytes: Array[Byte]): PublishedEventImpl = {
@@ -189,7 +200,9 @@ import pekko.serialization.{ BaseSerializer, 
SerializerWithStringManifest }
           new ReplicatedPublishedEventMetaData(
             ReplicaId(protoMeta.getReplicaId),
             versionVectorFromProto(protoMeta.getVersionVector)))
-      } else None)
+      } else None,
+      if (!p.hasReplyTo) None
+      else Some(resolver.resolveActorRef(p.getReplyTo)))
   }
 
   def counterFromBinary(bytes: Array[Byte]): Counter =
diff --git 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
index 32d8b72fa2..8b3c76f940 100644
--- 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
+++ 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
@@ -49,11 +49,23 @@ class ReplicatedEventSourcingSerializerSpec extends 
ScalaTestWithActorTestKit wi
           10,
           "payload",
           1,
-          Some(new ReplicatedPublishedEventMetaData(ReplicaId("R1"), 
VersionVector.empty))),
+          Some(new ReplicatedPublishedEventMetaData(ReplicaId("R1"), 
VersionVector.empty)),
+          None),
         assertEquality = false)
 
       serializationTestKit.verifySerialization(
-        PublishedEventImpl(PersistenceId.ofUniqueId("cat"), 10, "payload", 1, 
None),
+        PublishedEventImpl(PersistenceId.ofUniqueId("cat"), 10, "payload", 1, 
None, None),
+        assertEquality = false)
+
+      // verify replyTo round-trip serialization
+      serializationTestKit.verifySerialization(
+        PublishedEventImpl(
+          PersistenceId.ofUniqueId("cat"),
+          10,
+          "payload",
+          1,
+          Some(new ReplicatedPublishedEventMetaData(ReplicaId("R1"), 
VersionVector.empty)),
+          Some(system.deadLetters)),
         assertEquality = false)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to