This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 3c95ddb4da Support for external transport of replicated events (#2808)
3c95ddb4da is described below
commit 3c95ddb4da8bca6afbd1d2a3a1b8e5fefd3f34d4
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 19:11:42 2026 +0800
Support for external transport of replicated events (#2808)
Port the external transport feature for replicated events from upstream.
This enables replicated events to be transported through external mechanisms
(such as Projections) rather than only through direct replication.
Key changes:
- Add replyTo: Option[ActorRef[Done]] to PublishedEventImpl for ack support
- Add lossyTransport marker to distinguish transport reliability semantics
- Add externalReplication API in ReplicatedEventSourcing for entities using
external transport (query plugin set to NoPlugin)
- Update onPublishedEvent: non-lossy transports accept gaps in sequence
numbers, lossy transports reject gaps to prevent data loss
- Add handleExternalReplicatedEventPersist ack via SideEffect
- Add ReplicationContextImpl.NoPlugin companion constant
- Guard replication stream start with NoPlugin check in ReplayingEvents
- Serialize/deserialize replyTo field via ActorRefResolver
- Update ReplicatedEventSourcing.proto with replyTo field (field 6)
- Fix missing event.persistenceId argument in warn log
- Add MiMa binary compatibility excludes for new proto methods
- Add tests: ack reply, non-lossy transport gap acceptance, replyTo
round-trip serialization
Upstream: akka/akka-core@3cbed9032ae3123acacbe17837e590eb2df43975
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
---
.../ReplicatedShardingDirectReplicationSpec.scala | 9 +-
.../typed/ReplicatedEventPublishingSpec.scala | 108 +++++-
.../typed/ReplicationSnapshotSpec.scala | 6 +-
.../serialization/ReplicatedEventSourcing.java | 380 ++++++++++++++++++---
.../external-replication.excludes | 22 ++
.../main/protobuf/ReplicatedEventSourcing.proto | 1 +
.../typed/internal/EventSourcedBehaviorImpl.scala | 8 +-
.../typed/internal/ReplayingEvents.scala | 6 +-
.../typed/internal/ReplicationSetup.scala | 8 +
.../pekko/persistence/typed/internal/Running.scala | 43 ++-
.../typed/scaladsl/ReplicatedEventSourcing.scala | 22 ++
.../ReplicatedEventSourcingSerializer.scala | 23 +-
.../ReplicatedEventSourcingSerializerSpec.scala | 16 +-
13 files changed, 571 insertions(+), 81 deletions(-)
diff --git
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
index 5187664168..e8eb44dc7b 100644
---
a/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
+++
b/cluster-sharding-typed/src/test/scala/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala
@@ -54,7 +54,8 @@ class ReplicatedShardingDirectReplicationSpec extends
ScalaTestWithActorTestKit
1L,
"event",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"),
VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"),
VersionVector.empty)),
+ None)
system.eventStream ! EventStream.Publish(event)
replicaBProbe.receiveMessage().message should equal(event)
@@ -80,7 +81,8 @@ class ReplicatedShardingDirectReplicationSpec extends
ScalaTestWithActorTestKit
1L,
"event",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"),
VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"),
VersionVector.empty)),
+ None)
system.eventStream ! EventStream.Publish(event)
replicaAProbe.expectNoMessage()
@@ -104,7 +106,8 @@ class ReplicatedShardingDirectReplicationSpec extends
ScalaTestWithActorTestKit
1L,
"event",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"),
VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"),
VersionVector.empty)),
+ None)
system.eventStream ! EventStream.Publish(event)
replicaAProbe.expectNoMessage()
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
index 7faab3f63c..024ab6ddcd 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicatedEventPublishingSpec.scala
@@ -65,6 +65,31 @@ object ReplicatedEventPublishingSpec {
},
(state, string) => state + string))
}
+
+ def externalReplication(entityId: String, replicaId: ReplicaId,
allReplicas: Set[ReplicaId]): Behavior[Command] =
+ Behaviors.setup { ctx =>
+ ReplicatedEventSourcing.externalReplication(ReplicationId(EntityType,
entityId, replicaId), allReplicas)(
+ replicationContext =>
+ EventSourcedBehavior[Command, String, Set[String]](
+ replicationContext.persistenceId,
+ Set.empty,
+ (state, command) =>
+ command match {
+ case Add(string, replyTo) =>
+ ctx.log.debug("Persisting [{}]", string)
+ Effect.persist(string).thenRun { _ =>
+ ctx.log.debug("Ack:ing [{}]", string)
+ replyTo ! Done
+ }
+ case Get(replyTo) =>
+ replyTo ! state
+ Effect.none
+ case Stop =>
+ Effect.stop()
+ case unexpected => throw new RuntimeException(s"Unexpected:
$unexpected")
+ },
+ (state, string) => state + string))
+ }
}
}
@@ -99,7 +124,8 @@ class ReplicatedEventPublishingSpec
1L,
"two",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
@@ -107,7 +133,38 @@ class ReplicatedEventPublishingSpec
probe.expectMessage(Set("one", "two", "three"))
}
- "ignore a published event from a replica is received but the sequence
number is unexpected" in {
+ "reply with an ack for a published event if requested" in {
+ val id = nextEntityId()
+ val actor = spawn(MyReplicatedBehavior.externalReplication(id, DCA,
Set(DCA, DCB)))
+ val probe = createTestProbe[Any]()
+
+ val ackProbe = createTestProbe[Done]()
+ val persistenceId = ReplicationId(EntityType, id, DCB).persistenceId
+ // a published event from another replica
+ val publishedEvent = internal.PublishedEventImpl(
+ persistenceId,
+ 1L,
+ "one",
+ System.currentTimeMillis(),
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ Some(ackProbe.ref))
+ actor.asInstanceOf[ActorRef[Any]] ! publishedEvent
+ ackProbe.receiveMessage()
+
+ actor ! MyReplicatedBehavior.Get(probe.ref)
+ probe.expectMessage(Set("one"))
+
+ // also if we publish it again, we ack since we have seen and persisted
it
+ // even if we deduplicate and don't write anything
+ actor.asInstanceOf[ActorRef[Any]] ! publishedEvent
+ ackProbe.receiveMessage()
+
+ // nothing changed
+ actor ! MyReplicatedBehavior.Get(probe.ref)
+ probe.expectMessage(Set("one"))
+ }
+
+ "ignore a published event from a replica is received over a lossy
transport when there is a gap in sequence numbers" in {
val id = nextEntityId()
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
val probe = createTestProbe[Any]()
@@ -120,7 +177,8 @@ class ReplicatedEventPublishingSpec
2L, // missing 1L
"two",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
@@ -128,6 +186,32 @@ class ReplicatedEventPublishingSpec
probe.expectMessage(Set("one", "three"))
}
+ "accept a published event from a replica is received over a non-lossy
transport when there is a gap in sequence numbers" in {
+ // scenario:
+ // DCB saw a replicated event from DCA first, so already used 1 as seq
nr for that
+ // then does a write of its own that is now replicating over to DCA -
DCA has not seen
+ // DCB -> 1 but should still accept the update since the transport is
not lossy
+ val id = nextEntityId()
+ val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
+ val probe = createTestProbe[Any]()
+ actor ! MyReplicatedBehavior.Add("one", probe.ref)
+ probe.expectMessage(Done)
+
+ actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
+ ReplicationId(EntityType, id, DCB).persistenceId,
+ 2L, // missing 1L
+ "two",
+ System.currentTimeMillis(),
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ Some(probe.ref))
+ probe.expectMessage(Done)
+ actor ! MyReplicatedBehavior.Add("three", probe.ref)
+ probe.expectMessage(Done)
+
+ actor ! MyReplicatedBehavior.Get(probe.ref)
+ probe.expectMessage(Set("one", "two", "three"))
+ }
+
"ignore a published event from an unknown replica" in {
val id = nextEntityId()
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB)))
@@ -141,7 +225,8 @@ class ReplicatedEventPublishingSpec
1L,
"two",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCC, VersionVector.empty)),
+ None)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
@@ -162,14 +247,16 @@ class ReplicatedEventPublishingSpec
1L,
"two",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
// simulate another published event from that replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
ReplicationId(EntityType, id, DCB).persistenceId,
1L,
"two-again", // ofc this would be the same in the real world,
different just so we can detect
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
actor ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
@@ -201,7 +288,8 @@ class ReplicatedEventPublishingSpec
1L,
"two",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
incarnation2 ! MyReplicatedBehavior.Add("three", probe.ref)
probe.expectMessage(Done)
@@ -224,7 +312,8 @@ class ReplicatedEventPublishingSpec
1L,
"two",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
incarnationA1 ! MyReplicatedBehavior.Stop
probe.expectTerminated(incarnationA1)
@@ -237,7 +326,8 @@ class ReplicatedEventPublishingSpec
2L,
"three",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty)),
+ None)
incarnationA2 ! MyReplicatedBehavior.Add("four", probe.ref)
probe.expectMessage(Done)
diff --git
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
index 2ef3da3f59..7165af9d39 100644
---
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
+++
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/ReplicationSnapshotSpec.scala
@@ -97,7 +97,8 @@ class ReplicationSnapshotSpec
1L,
"two-again",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)),
+ None)
// r2 should now filter out that event if it receives it again
r2EventProbe.expectNoMessage()
@@ -111,7 +112,8 @@ class ReplicationSnapshotSpec
1L,
"two-again",
System.currentTimeMillis(),
- Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)))
+ Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)),
+ None)
r2EventProbe.expectNoMessage()
val stateProbe = createTestProbe[State]()
diff --git
a/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
b/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
index 3a14a13e8c..93018340e8 100644
---
a/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
+++
b/persistence-typed/src/main/java/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcing.java
@@ -14,7 +14,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// NO CHECKED-IN PROTOBUF GENCODE
// source: ReplicatedEventSourcing.proto
-// Protobuf Java Version: 4.33.0
+// Protobuf Java Version: 4.34.0
package org.apache.pekko.persistence.typed.serialization;
@@ -27,7 +27,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ReplicatedEventSourcing");
@@ -55,7 +55,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ORSetDeltaOp");
@@ -116,7 +116,7 @@ public final class ReplicatedEventSourcing
public final
org.apache.pekko.protobufv3.internal.Descriptors.EnumValueDescriptor
getValueDescriptor() {
- return getDescriptor().getValues().get(ordinal());
+ return getDescriptor().getValue(ordinal());
}
public final
org.apache.pekko.protobufv3.internal.Descriptors.EnumDescriptor
@@ -127,8 +127,7 @@ public final class ReplicatedEventSourcing
public static
org.apache.pekko.protobufv3.internal.Descriptors.EnumDescriptor getDescriptor()
{
return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
.getDescriptor()
- .getEnumTypes()
- .get(0);
+ .getEnumType(0);
}
private static final ORSetDeltaOp[] VALUES = values();
@@ -181,7 +180,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"Counter");
@@ -202,6 +201,12 @@ public final class ReplicatedEventSourcing
.internal_static_Counter_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_Counter_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -734,7 +739,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"CounterUpdate");
@@ -756,6 +761,12 @@ public final class ReplicatedEventSourcing
.internal_static_CounterUpdate_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_CounterUpdate_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -1447,7 +1458,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ORSet");
@@ -1473,6 +1484,12 @@ public final class ReplicatedEventSourcing
.internal_static_ORSet_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ORSet_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -1865,10 +1882,15 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
2, getVvector());
}
- for (int i = 0; i < dots_.size(); i++) {
- size +=
-
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
- 3, dots_.get(i));
+
+ {
+ final int count = dots_.size();
+ for (int i = 0; i < count; i++) {
+ size +=
+
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+ dots_.get(i));
+ }
+ size += 1 * count;
}
{
int dataSize = 0;
@@ -1910,10 +1932,15 @@ public final class ReplicatedEventSourcing
}
longElementsMemoizedSerializedSize = dataSize;
}
- for (int i = 0; i < otherElements_.size(); i++) {
- size +=
-
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
- 7, otherElements_.get(i));
+
+ {
+ final int count = otherElements_.size();
+ for (int i = 0; i < count; i++) {
+ size +=
+
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+ otherElements_.get(i));
+ }
+ size += 1 * count;
}
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
@@ -3709,7 +3736,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ORSetDeltaGroup");
@@ -3731,6 +3758,12 @@ public final class ReplicatedEventSourcing
.internal_static_ORSetDeltaGroup_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ORSetDeltaGroup_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -3794,7 +3827,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"Entry");
@@ -3815,6 +3848,12 @@ public final class ReplicatedEventSourcing
.internal_static_ORSetDeltaGroup_Entry_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ORSetDeltaGroup_Entry_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -4701,10 +4740,15 @@ public final class ReplicatedEventSourcing
if (size != -1) return size;
size = 0;
- for (int i = 0; i < entries_.size(); i++) {
- size +=
-
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
- 1, entries_.get(i));
+
+ {
+ final int count = entries_.size();
+ for (int i = 0; i < count; i++) {
+ size +=
+
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+ entries_.get(i));
+ }
+ size += 1 * count;
}
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
@@ -5486,7 +5530,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"VersionVector");
@@ -5508,6 +5552,12 @@ public final class ReplicatedEventSourcing
.internal_static_VersionVector_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_VersionVector_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -5572,7 +5622,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"Entry");
@@ -5593,6 +5643,12 @@ public final class ReplicatedEventSourcing
.internal_static_VersionVector_Entry_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_VersionVector_Entry_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -6390,10 +6446,15 @@ public final class ReplicatedEventSourcing
if (size != -1) return size;
size = 0;
- for (int i = 0; i < entries_.size(); i++) {
- size +=
-
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
- 1, entries_.get(i));
+
+ {
+ final int count = entries_.size();
+ for (int i = 0; i < count; i++) {
+ size +=
+
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+ entries_.get(i));
+ }
+ size += 1 * count;
}
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
@@ -7212,7 +7273,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ReplicatedEventMetadata");
@@ -7234,6 +7295,12 @@ public final class ReplicatedEventSourcing
.internal_static_ReplicatedEventMetadata_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ReplicatedEventMetadata_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -8377,7 +8444,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ReplicatedSnapshotMetadata");
@@ -8399,6 +8466,12 @@ public final class ReplicatedEventSourcing
.internal_static_ReplicatedSnapshotMetadata_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ReplicatedSnapshotMetadata_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -8463,7 +8536,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"Seen");
@@ -8484,6 +8557,12 @@ public final class ReplicatedEventSourcing
.internal_static_ReplicatedSnapshotMetadata_Seen_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ReplicatedSnapshotMetadata_Seen_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -9342,10 +9421,15 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
1, getVersion());
}
- for (int i = 0; i < seenPerReplica_.size(); i++) {
- size +=
-
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
- 2, seenPerReplica_.get(i));
+
+ {
+ final int count = seenPerReplica_.size();
+ for (int i = 0; i < count; i++) {
+ size +=
+
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSizeNoTag(
+ seenPerReplica_.get(i));
+ }
+ size += 1 * count;
}
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
@@ -10352,7 +10436,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"ReplicatedPublishedEventMetaData");
@@ -10374,6 +10458,12 @@ public final class ReplicatedEventSourcing
.internal_static_ReplicatedPublishedEventMetaData_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_ReplicatedPublishedEventMetaData_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -11318,6 +11408,27 @@ public final class ReplicatedEventSourcing
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
.ReplicatedPublishedEventMetaDataOrBuilder
getMetadataOrBuilder();
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return Whether the replyTo field is set.
+ */
+ boolean hasReplyTo();
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return The replyTo.
+ */
+ java.lang.String getReplyTo();
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return The bytes for replyTo.
+ */
+ org.apache.pekko.protobufv3.internal.ByteString getReplyToBytes();
}
/** Protobuf type {@code PublishedEvent} */
@@ -11332,7 +11443,7 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"PublishedEvent");
@@ -11346,6 +11457,7 @@ public final class ReplicatedEventSourcing
private PublishedEvent() {
persistenceId_ = "";
+ replyTo_ = "";
}
public static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
@@ -11354,6 +11466,12 @@ public final class ReplicatedEventSourcing
.internal_static_PublishedEvent_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return
org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing
+ .internal_static_PublishedEvent_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -11541,6 +11659,60 @@ public final class ReplicatedEventSourcing
: metadata_;
}
+ public static final int REPLYTO_FIELD_NUMBER = 6;
+
+ @SuppressWarnings("serial")
+ private volatile java.lang.Object replyTo_ = "";
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return Whether the replyTo field is set.
+ */
+ @java.lang.Override
+ public boolean hasReplyTo() {
+ return ((bitField0_ & 0x00000020) != 0);
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return The replyTo.
+ */
+ @java.lang.Override
+ public java.lang.String getReplyTo() {
+ java.lang.Object ref = replyTo_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ org.apache.pekko.protobufv3.internal.ByteString bs =
+ (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ replyTo_ = s;
+ }
+ return s;
+ }
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return The bytes for replyTo.
+ */
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.ByteString getReplyToBytes() {
+ java.lang.Object ref = replyTo_;
+ if (ref instanceof java.lang.String) {
+ org.apache.pekko.protobufv3.internal.ByteString b =
+
org.apache.pekko.protobufv3.internal.ByteString.copyFromUtf8((java.lang.String)
ref);
+ replyTo_ = b;
+ return b;
+ } else {
+ return (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ }
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
@@ -11584,6 +11756,9 @@ public final class ReplicatedEventSourcing
if (((bitField0_ & 0x00000010) != 0)) {
output.writeMessage(5, getMetadata());
}
+ if (((bitField0_ & 0x00000020) != 0)) {
+
org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 6,
replyTo_);
+ }
getUnknownFields().writeTo(output);
}
@@ -11616,6 +11791,10 @@ public final class ReplicatedEventSourcing
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
5, getMetadata());
}
+ if (((bitField0_ & 0x00000020) != 0)) {
+ size +=
+
org.apache.pekko.protobufv3.internal.GeneratedMessage.computeStringSize(6,
replyTo_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
return size;
@@ -11658,6 +11837,10 @@ public final class ReplicatedEventSourcing
if (hasMetadata()) {
if (!getMetadata().equals(other.getMetadata())) return false;
}
+ if (hasReplyTo() != other.hasReplyTo()) return false;
+ if (hasReplyTo()) {
+ if (!getReplyTo().equals(other.getReplyTo())) return false;
+ }
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -11690,6 +11873,10 @@ public final class ReplicatedEventSourcing
hash = (37 * hash) + METADATA_FIELD_NUMBER;
hash = (53 * hash) + getMetadata().hashCode();
}
+ if (hasReplyTo()) {
+ hash = (37 * hash) + REPLYTO_FIELD_NUMBER;
+ hash = (53 * hash) + getReplyTo().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -11882,6 +12069,7 @@ public final class ReplicatedEventSourcing
metadataBuilder_.dispose();
metadataBuilder_ = null;
}
+ replyTo_ = "";
return this;
}
@@ -11948,6 +12136,10 @@ public final class ReplicatedEventSourcing
result.metadata_ = metadataBuilder_ == null ? metadata_ :
metadataBuilder_.build();
to_bitField0_ |= 0x00000010;
}
+ if (((from_bitField0_ & 0x00000020) != 0)) {
+ result.replyTo_ = replyTo_;
+ to_bitField0_ |= 0x00000020;
+ }
result.bitField0_ |= to_bitField0_;
}
@@ -11990,6 +12182,11 @@ public final class ReplicatedEventSourcing
if (other.hasMetadata()) {
mergeMetadata(other.getMetadata());
}
+ if (other.hasReplyTo()) {
+ replyTo_ = other.replyTo_;
+ bitField0_ |= 0x00000020;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -12058,6 +12255,12 @@ public final class ReplicatedEventSourcing
bitField0_ |= 0x00000010;
break;
} // case 42
+ case 50:
+ {
+ replyTo_ = input.readBytes();
+ bitField0_ |= 0x00000020;
+ break;
+ } // case 50
default:
{
if (!super.parseUnknownField(input, extensionRegistry, tag))
{
@@ -12549,6 +12752,98 @@ public final class ReplicatedEventSourcing
return metadataBuilder_;
}
+ private java.lang.Object replyTo_ = "";
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return Whether the replyTo field is set.
+ */
+ public boolean hasReplyTo() {
+ return ((bitField0_ & 0x00000020) != 0);
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return The replyTo.
+ */
+ public java.lang.String getReplyTo() {
+ java.lang.Object ref = replyTo_;
+ if (!(ref instanceof java.lang.String)) {
+ org.apache.pekko.protobufv3.internal.ByteString bs =
+ (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ replyTo_ = s;
+ }
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return The bytes for replyTo.
+ */
+ public org.apache.pekko.protobufv3.internal.ByteString getReplyToBytes()
{
+ java.lang.Object ref = replyTo_;
+ if (ref instanceof String) {
+ org.apache.pekko.protobufv3.internal.ByteString b =
+
org.apache.pekko.protobufv3.internal.ByteString.copyFromUtf8((java.lang.String)
ref);
+ replyTo_ = b;
+ return b;
+ } else {
+ return (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ }
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @param value The replyTo to set.
+ * @return This builder for chaining.
+ */
+ public Builder setReplyTo(java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ replyTo_ = value;
+ bitField0_ |= 0x00000020;
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @return This builder for chaining.
+ */
+ public Builder clearReplyTo() {
+ replyTo_ = getDefaultInstance().getReplyTo();
+ bitField0_ = (bitField0_ & ~0x00000020);
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>optional string replyTo = 6;</code>
+ *
+ * @param value The bytes for replyTo to set.
+ * @return This builder for chaining.
+ */
+ public Builder
setReplyToBytes(org.apache.pekko.protobufv3.internal.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ replyTo_ = value;
+ bitField0_ |= 0x00000020;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:PublishedEvent)
}
@@ -12661,7 +12956,7 @@ public final class ReplicatedEventSourcing
return descriptor;
}
- private static
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor descriptor;
+ private static final
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor descriptor;
static {
java.lang.String[] descriptorData = {
@@ -12703,14 +12998,15 @@ public final class ReplicatedEventSourcing
+ "sequenceNr\030\002 \002(\003\"\\\n"
+ " ReplicatedPublishedEventMetaData\022\021\n"
+ "\treplicaId\030\001 \001(\t\022%\n\r"
- + "versionVector\030\002 \001(\0132\016.VersionVector\"\236\001\n"
+ + "versionVector\030\002 \001(\0132\016.VersionVector\"\257\001\n"
+ "\016PublishedEvent\022\025\n"
+ "\r"
+ "persistenceId\030\001 \001(\t\022\022\n\n"
+ "sequenceNr\030\002 \001(\003\022\031\n"
+ "\007payload\030\003 \001(\0132\010.Payload\022\021\n"
+ "\ttimestamp\030\004 \001(\003\0223\n"
- + "\010metadata\030\005
\001(\0132!.ReplicatedPublishedEventMetaData*-\n"
+ + "\010metadata\030\005
\001(\0132!.ReplicatedPublishedEventMetaData\022\017\n"
+ + "\007replyTo\030\006 \001(\t*-\n"
+ "\014ORSetDeltaOp\022\007\n"
+ "\003Add\020\000\022\n\n"
+ "\006Remove\020\001\022\010\n"
@@ -12815,7 +13111,7 @@ public final class ReplicatedEventSourcing
new
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable(
internal_static_PublishedEvent_descriptor,
new java.lang.String[] {
- "PersistenceId", "SequenceNr", "Payload", "Timestamp",
"Metadata",
+ "PersistenceId", "SequenceNr", "Payload", "Timestamp",
"Metadata", "ReplyTo",
});
descriptor.resolveAllFeaturesImmutable();
org.apache.pekko.remote.ContainerFormats.getDescriptor();
diff --git
a/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/external-replication.excludes
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/external-replication.excludes
new file mode 100644
index 0000000000..dacffd5b16
--- /dev/null
+++
b/persistence-typed/src/main/mima-filters/2.0.x.backwards.excludes/external-replication.excludes
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# External transport of replicated events (port of akka/akka-core@3cbed9032a)
+# New replyTo field added to PublishedEvent protobuf message
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing#PublishedEventOrBuilder.hasReplyTo")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing#PublishedEventOrBuilder.getReplyTo")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.persistence.typed.serialization.ReplicatedEventSourcing#PublishedEventOrBuilder.getReplyToBytes")
diff --git a/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
b/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
index 4b05c44ec2..8328a40a28 100644
--- a/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
+++ b/persistence-typed/src/main/protobuf/ReplicatedEventSourcing.proto
@@ -85,4 +85,5 @@ message PublishedEvent {
optional Payload payload = 3;
optional int64 timestamp = 4;
optional ReplicatedPublishedEventMetaData metadata = 5;
+ optional string replyTo = 6;
}
\ No newline at end of file
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
index ae6bb6f221..0aa0df8bed 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import org.apache.pekko
+import pekko.Done
import pekko.actor.typed
import pekko.actor.typed.ActorRef
import pekko.actor.typed.BackoffSupervisorStrategy
@@ -385,13 +386,14 @@ final class ReplicatedPublishedEventMetaData(val
replicaId: ReplicaId, private[p
/**
* INTERNAL API
*/
-@InternalApi
+@InternalStableApi
private[pekko] final case class PublishedEventImpl(
persistenceId: PersistenceId,
sequenceNumber: Long,
payload: Any,
timestamp: Long,
- replicatedMetaData: Option[ReplicatedPublishedEventMetaData])
+ replicatedMetaData: Option[ReplicatedPublishedEventMetaData],
+ replyTo: Option[ActorRef[Done]])
extends PublishedEvent
with InternalProtocol {
import scala.jdk.OptionConverters._
@@ -411,5 +413,7 @@ private[pekko] final case class PublishedEventImpl(
case _ => this
}
+ def lossyTransport: Boolean = replyTo.isEmpty
+
override def getReplicatedMetaData:
Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.toJava
}
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
index 9dce502153..74fc40f86c 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplayingEvents.scala
@@ -304,8 +304,10 @@ private[pekko] final class ReplayingEvents[C, E, S](
replicationControl = Map.empty)
val running = new
Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
val initialRunningState = setup.replication match {
- case Some(replication) => startReplicationStream(setup,
runningState, replication)
- case None => runningState
+ case Some(replication)
+ if replication.allReplicasAndQueryPlugins.values.forall(_ !=
ReplicationContextImpl.NoPlugin) =>
+ startReplicationStream(setup, runningState, replication)
+ case _ => runningState
}
setup.retention match {
case criteria: SnapshotCountRetentionCriteriaImpl if
criteria.snapshotEveryNEvents <= state.eventsReplayed =>
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
index 5c4f00016b..4bbce5ef25 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ReplicationSetup.scala
@@ -23,6 +23,14 @@ import pekko.persistence.typed.ReplicationId
import pekko.util.OptionVal
import pekko.util.WallClock
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] object ReplicationContextImpl {
+ val NoPlugin = "no-plugin"
+}
+
/**
* INTERNAL API
*/
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index 533b1b56c0..bb983d05a2 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -361,7 +361,7 @@ private[pekko] object Running {
"Saving event [{}] from [{}] as first time",
envelope.event.originSequenceNr,
envelope.event.originReplica)
- handleExternalReplicatedEventPersist(replication, envelope.event)
+ handleExternalReplicatedEventPersist(replication, envelope.event, None)
} else {
setup.internalLogger.debug(
"Filtering event [{}] from [{}] as it was already seen",
@@ -382,7 +382,9 @@ private[pekko] object Running {
case Some(replication) =>
event.replicatedMetaData match {
case None =>
- setup.internalLogger.warn("Received published event for [{}] but
with no replicated metadata, dropping")
+ setup.internalLogger.warn(
+ "Received published event for [{}] but with no replicated
metadata, dropping",
+ event.persistenceId)
this
case Some(replicatedEventMetaData) =>
onPublishedEvent(state, replication, replicatedEventMetaData,
event)
@@ -409,6 +411,7 @@ private[pekko] object Running {
"Ignoring published replicated event with seqNr [{}] from our own
replica id [{}]",
event.sequenceNumber,
originReplicaId)
+ event.replyTo.foreach(_ ! pekko.Done) // probably won't happen
this
} else if (!replication.allReplicas.contains(originReplicaId)) {
log.warnN(
@@ -417,18 +420,20 @@ private[pekko] object Running {
replication.allReplicas.mkString(", "))
this
} else {
- val expectedSequenceNumber =
state.seenPerReplica.getOrElse(originReplicaId, 0L) + 1
- if (expectedSequenceNumber > event.sequenceNumber) {
- // already seen
+ val seenSequenceNr = state.seenPerReplica.getOrElse(originReplicaId,
0L)
+ if (seenSequenceNr >= event.sequenceNumber) {
+ // already seen/deduplication
if (log.isDebugEnabled)
log.debugN(
- "Ignoring published replicated event with seqNr [{}] from
replica [{}] because it was already seen ([{}])",
+ "Ignoring published replicated event with seqNr [{}] from
replica [{}] because it was already seen (version: {})",
event.sequenceNumber,
originReplicaId,
- expectedSequenceNumber)
+ state.seenPerReplica)
+ event.replyTo.foreach(_ ! pekko.Done)
this
- } else if (expectedSequenceNumber != event.sequenceNumber) {
- // gap in sequence numbers (message lost or query and direct
replication out of sync, should heal up by itself
+ } else if (event.lossyTransport && event.sequenceNumber !=
(seenSequenceNr + 1)) {
+ // Lossy transport/opportunistic replication cannot allow gaps in
sequence
+ // numbers (message lost or query and direct replication out of
sync, should heal up by itself
// once the query catches up)
if (log.isDebugEnabled) {
log.debugN(
@@ -436,7 +441,7 @@ private[pekko] object Running {
"because expected replication seqNr was [{}] ",
event.sequenceNumber,
originReplicaId,
- expectedSequenceNumber)
+ seenSequenceNr + 1)
}
this
} else {
@@ -458,7 +463,8 @@ private[pekko] object Running {
event.event.asInstanceOf[E],
originReplicaId,
event.sequenceNumber,
- replicatedMetadata.version))
+ replicatedMetadata.version),
+ event.replyTo)
}
}
@@ -477,7 +483,8 @@ private[pekko] object Running {
private def handleExternalReplicatedEventPersist(
replication: ReplicationSetup,
- event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
+ event: ReplicatedEvent[E],
+ ackToOnPersisted: Option[pekko.actor.typed.ActorRef[pekko.Done]]):
Behavior[InternalProtocol] = {
_currentSequenceNumber = state.seqNr + 1
val isConcurrent: Boolean = event.originVersion <> state.version
val updatedVersion = event.originVersion.merge(state.version)
@@ -499,6 +506,14 @@ private[pekko] object Running {
replication.clearContext()
+ val sideEffects = ackToOnPersisted match {
+ case None => Nil
+ case Some(ref) =>
+ SideEffect[S] { _ =>
+ ref ! pekko.Done
+ } :: Nil
+ }
+
val newState2: RunningState[S, C] = internalPersist(
setup.context,
null,
@@ -516,7 +531,7 @@ private[pekko] object Running {
numberOfEvents = 1,
shouldSnapshotAfterPersist,
shouldPublish = false,
- Nil)
+ sideEffects)
}
private def handleEventPersist(
@@ -789,7 +804,7 @@ private[pekko] object Running {
val meta = setup.replication.map(replication =>
new ReplicatedPublishedEventMetaData(replication.replicaId,
state.version))
context.system.eventStream ! EventStream.Publish(
- PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload,
p.timestamp, meta))
+ PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload,
p.timestamp, meta, None))
}
// only once all things are applied we can revert back
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
index 400c4eec02..bed846ac93 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/scaladsl/ReplicatedEventSourcing.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.persistence.typed.scaladsl
import org.apache.pekko
import pekko.annotation.DoNotInherit
+import pekko.annotation.InternalStableApi
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.ReplicaId
import pekko.persistence.typed.ReplicationId
@@ -124,4 +125,25 @@ object ReplicatedEventSourcing {
eventSourcedBehaviorFactory(context).withReplication(context)
}
+ /**
+ * INTERNAL API
+ *
+ * Initialize a replicated event sourced behavior.
+ *
+ * Events from each replica for the same entityId need to be passed to it by
an external stream.
+ * Care must be taken to handle events in any order as events can happen
concurrently at different replicas.
+ *
+ * Using a replicated event sourced behavior means there is no longer the
single writer guarantee.
+ *
+ * The journal plugin id for the entity itself can be configured using
withJournalPluginId after creation.
+ */
+ @InternalStableApi
+ private[pekko] def externalReplication[Command, Event, State](
+ replicationId: ReplicationId,
+ allReplicas: Set[ReplicaId])(
+ eventSourcedBehaviorFactory: ReplicationContext =>
EventSourcedBehavior[Command, Event, State])
+ : EventSourcedBehavior[Command, Event, State] = {
+ val context = new ReplicationContextImpl(replicationId, allReplicas.map(_
-> ReplicationContextImpl.NoPlugin).toMap)
+ eventSourcedBehaviorFactory(context).withReplication(context)
+ }
}
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
index 18338f09ba..c3926dbea5 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala
@@ -23,6 +23,8 @@ import scala.jdk.CollectionConverters._
import org.apache.pekko
import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorRefResolver
+import pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import pekko.annotation.InternalApi
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.ReplicaId
@@ -74,6 +76,9 @@ import pekko.serialization.{ BaseSerializer,
SerializerWithStringManifest }
with BaseSerializer {
private val wrappedSupport = new WrappedPayloadSupport(system)
+ // lazy because Serializers are initialized early on. `toTyped` might then
try to
+ // initialize the classic ActorSystemAdapter extension.
+ private lazy val resolver = ActorRefResolver(system.toTyped)
private val CrdtCounterManifest = "AA"
private val CrdtCounterUpdatedManifest = "AB"
@@ -163,9 +168,7 @@ import pekko.serialization.{ BaseSerializer,
SerializerWithStringManifest }
.setPayload(wrappedSupport.payloadBuilder(impl.payload))
.setTimestamp(impl.timestamp)
- (impl.replicatedMetaData match {
- case None =>
- builder
+ impl.replicatedMetaData match {
case Some(m) =>
builder.setMetadata(
ReplicatedEventSourcing.ReplicatedPublishedEventMetaData
@@ -173,7 +176,15 @@ import pekko.serialization.{ BaseSerializer,
SerializerWithStringManifest }
.setReplicaId(m.replicaId.id)
.setVersionVector(versionVectorToProto(m.version))
.build())
- }).build().toByteArray
+ case None =>
+ }
+
+ impl.replyTo match {
+ case Some(ref) => builder.setReplyTo(resolver.toSerializationFormat(ref))
+ case None =>
+ }
+
+ builder.build().toByteArray
}
def publishedEventFromBinary(bytes: Array[Byte]): PublishedEventImpl = {
@@ -189,7 +200,9 @@ import pekko.serialization.{ BaseSerializer,
SerializerWithStringManifest }
new ReplicatedPublishedEventMetaData(
ReplicaId(protoMeta.getReplicaId),
versionVectorFromProto(protoMeta.getVersionVector)))
- } else None)
+ } else None,
+ if (!p.hasReplyTo) None
+ else Some(resolver.resolveActorRef(p.getReplyTo)))
}
def counterFromBinary(bytes: Array[Byte]): Counter =
diff --git
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
index 32d8b72fa2..8b3c76f940 100644
---
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
+++
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/serialization/ReplicatedEventSourcingSerializerSpec.scala
@@ -49,11 +49,23 @@ class ReplicatedEventSourcingSerializerSpec extends
ScalaTestWithActorTestKit wi
10,
"payload",
1,
- Some(new ReplicatedPublishedEventMetaData(ReplicaId("R1"),
VersionVector.empty))),
+ Some(new ReplicatedPublishedEventMetaData(ReplicaId("R1"),
VersionVector.empty)),
+ None),
assertEquality = false)
serializationTestKit.verifySerialization(
- PublishedEventImpl(PersistenceId.ofUniqueId("cat"), 10, "payload", 1,
None),
+ PublishedEventImpl(PersistenceId.ofUniqueId("cat"), 10, "payload", 1,
None, None),
+ assertEquality = false)
+
+ // verify replyTo round-trip serialization
+ serializationTestKit.verifySerialization(
+ PublishedEventImpl(
+ PersistenceId.ofUniqueId("cat"),
+ 10,
+ "payload",
+ 1,
+ Some(new ReplicatedPublishedEventMetaData(ReplicaId("R1"),
VersionVector.empty)),
+ Some(system.deadLetters)),
assertEquality = false)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]