This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 4b19e82 Port CanTriggerReplay and ScalaBySlicesSourceProviderAdapter
from akka-projection 1.4.x (#476)
4b19e82 is described below
commit 4b19e82790064bd278dad11e05a299703a7157bc
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 14:34:37 2026 +0100
Port CanTriggerReplay and ScalaBySlicesSourceProviderAdapter from
akka-projection 1.4.x (#476)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-projection/sessions/5ed34547-44f6-4f51-ad38-2ce0babfb1be
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../projection/internal/CanTriggerReplay.scala | 24 ++++++++++++++++++++
.../internal/SourceProviderAdapter.scala | 26 ++++++++++++++++++++++
.../javadsl/EventSourcedProvider.scala | 12 +++++++++-
.../scaladsl/EventSourcedProvider.scala | 12 +++++++++-
4 files changed, 72 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/CanTriggerReplay.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/CanTriggerReplay.scala
new file mode 100644
index 0000000..5f53c4a
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/CanTriggerReplay.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.internal
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] trait CanTriggerReplay {
+ private[pekko] def triggerReplay(persistenceId: String, fromSeqNr: Long):
Unit
+}
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
index 5535653..91f8170 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
@@ -24,9 +24,11 @@ import scala.jdk.OptionConverters._
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
+import pekko.projection.BySlicesSourceProvider
import pekko.projection.javadsl
import pekko.projection.scaladsl
import pekko.stream.scaladsl.Source
+import pekko.stream.javadsl.{ Source => JSource }
/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
@@ -49,3 +51,27 @@ import pekko.stream.scaladsl.Source
def extractCreationTime(envelope: Envelope): Long =
delegate.extractCreationTime(envelope)
}
+
+/**
+ * INTERNAL API: Adapter from scaladsl.SourceProvider with
BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
+ */
+@InternalApi private[projection] class
ScalaBySlicesSourceProviderAdapter[Offset, Envelope](
+ delegate: scaladsl.SourceProvider[Offset, Envelope] with
BySlicesSourceProvider)
+ extends javadsl.SourceProvider[Offset, Envelope]
+ with BySlicesSourceProvider {
+ override def source(
+ offset: Supplier[CompletionStage[Optional[Offset]]])
+ : CompletionStage[JSource[Envelope, NotUsed]] =
+ delegate
+ .source(() =>
offset.get().asScala.map(_.toScala)(ExecutionContext.parasitic))
+ .map(_.asJava)(ExecutionContext.parasitic)
+ .asJava
+
+ override def extractOffset(envelope: Envelope): Offset =
delegate.extractOffset(envelope)
+
+ override def extractCreationTime(envelope: Envelope): Long =
delegate.extractCreationTime(envelope)
+
+ def minSlice: Int = delegate.minSlice
+
+ def maxSlice: Int = delegate.maxSlice
+}
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index 77581e4..c3d1dd9 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -39,6 +39,7 @@ import
pekko.persistence.query.typed.javadsl.EventsBySliceQuery
import pekko.persistence.query.typed.javadsl.LoadEventQuery
import pekko.projection.BySlicesSourceProvider
import pekko.projection.eventsourced.EventEnvelope
+import pekko.projection.internal.CanTriggerReplay
import pekko.projection.javadsl
import pekko.projection.javadsl.SourceProvider
import pekko.stream.javadsl.Source
@@ -127,7 +128,16 @@ object EventSourcedProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
- new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
+ eventsBySlicesQuery match {
+ case query: EventsBySliceQuery with CanTriggerReplay =>
+ new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery,
entityType, minSlice, maxSlice, system)
+ with CanTriggerReplay {
+ private[pekko] override def triggerReplay(persistenceId: String,
fromSeqNr: Long): Unit =
+ query.triggerReplay(persistenceId, fromSeqNr)
+ }
+ case _ =>
+ new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
+ }
}
def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId:
String, persistenceId: String): Int =
diff --git
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
index 5a45e0f..19aa62a 100644
---
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
+++
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
@@ -32,6 +32,7 @@ import
pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
import pekko.projection.BySlicesSourceProvider
import pekko.projection.eventsourced.EventEnvelope
+import pekko.projection.internal.CanTriggerReplay
import pekko.projection.scaladsl.SourceProvider
import pekko.stream.scaladsl.Source
@@ -113,7 +114,16 @@ object EventSourcedProvider {
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset,
pekko.persistence.query.typed.EventEnvelope[Event]] = {
- new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
+ eventsBySlicesQuery match {
+ case query: EventsBySliceQuery with CanTriggerReplay =>
+ new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery,
entityType, minSlice, maxSlice, system)
+ with CanTriggerReplay {
+ private[pekko] override def triggerReplay(persistenceId: String,
fromSeqNr: Long): Unit =
+ query.triggerReplay(persistenceId, fromSeqNr)
+ }
+ case _ =>
+ new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType,
minSlice, maxSlice, system)
+ }
}
def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId:
String, persistenceId: String): Int =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]