This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new b5d1eea scala 3 support (#77)
b5d1eea is described below
commit b5d1eea6023de8d03d4d567693fe9ac0abd379b8
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Sep 2 19:57:35 2023 +0100
scala 3 support (#77)
* scala 3 support
---
.github/workflows/unit-tests.yml | 2 ++
build.sbt | 11 ++++++++---
.../internal/CassandraProjectionImpl.scala | 5 +++--
.../internal/HandlerRecoveryImplSpec.scala | 2 +-
.../tools/InternalProjectionStateMetricsSpec.scala | 21 ++++++++++++---------
.../pekko/projection/ProjectionBehavior.scala | 21 ++++++---------------
.../org/apache/pekko/projection/ProjectionId.scala | 2 +-
.../internal/InternalProjectionState.scala | 2 +-
.../projection/internal/ProjectionContextImpl.scala | 2 +-
.../jdbc/internal/JdbcProjectionImpl.scala | 4 ++--
project/Common.scala | 11 ++++++-----
project/Dependencies.scala | 4 +++-
.../testkit/internal/TestProjectionImpl.scala | 3 ++-
13 files changed, 48 insertions(+), 42 deletions(-)
diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml
index 990b58e..7343ae6 100644
--- a/.github/workflows/unit-tests.yml
+++ b/.github/workflows/unit-tests.yml
@@ -19,6 +19,8 @@ jobs:
- { java-version: 11, scala-version: 2.13.11, sbt-opts:
'-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
- { java-version: 8, scala-version: 2.12.18, sbt-opts: '' }
- { java-version: 11, scala-version: 2.12.18, sbt-opts:
'-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
+ - { java-version: 8, scala-version: 3.3.0, sbt-opts: '' }
+ - { java-version: 11, scala-version: 3.3.0, sbt-opts:
'-J-XX:+UnlockExperimentalVMOptions -J-XX:+UseJVMCICompiler' }
steps:
- name: Checkout
uses: actions/checkout@v3
diff --git a/build.sbt b/build.sbt
index e60c75c..6f83af4 100644
--- a/build.sbt
+++ b/build.sbt
@@ -32,6 +32,7 @@ lazy val core =
Project(id = "core", base = file("core"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.core)
@@ -44,6 +45,7 @@ lazy val core =
lazy val coreTest =
Project(id = "core-test", base = file("core-test"))
.configs(IntegrationTest)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.disablePlugins(MimaPlugin)
.settings(Defaults.itSettings)
@@ -58,6 +60,7 @@ lazy val testkit =
Project(id = "testkit", base = file("testkit"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.testKit)
@@ -70,6 +73,7 @@ lazy val jdbc =
Project(id = "jdbc", base = file("jdbc"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.jdbc)
@@ -84,6 +88,7 @@ lazy val slick =
Project(id = "slick", base = file("slick"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.slick)
@@ -99,6 +104,7 @@ lazy val cassandra =
Project(id = "cassandra", base = file("cassandra"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.cassandra)
@@ -115,6 +121,7 @@ lazy val cassandra =
lazy val eventsourced =
Project(id = "eventsourced", base = file("eventsourced"))
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.eventsourced)
.settings(
name := "pekko-projection-eventsourced")
@@ -140,14 +147,12 @@ lazy val `durable-state` =
Project(id = "durable-state", base = file("durable-state"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
+ .settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.state)
.settings(
name := "pekko-projection-durable-state")
.dependsOn(core)
.dependsOn(testkit % Test)
- .settings(
- // no previous artifact so must disable MiMa until this is released at
least once.
- mimaPreviousArtifacts := Set.empty)
lazy val examples = project
.configs(IntegrationTest.extend(Test))
diff --git
a/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/internal/CassandraProjectionImpl.scala
b/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/internal/CassandraProjectionImpl.scala
index 194b34a..e630dd8 100644
---
a/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/internal/CassandraProjectionImpl.scala
+++
b/cassandra/src/main/scala/org/apache/pekko/projection/cassandra/internal/CassandraProjectionImpl.scala
@@ -188,7 +188,7 @@ import pekko.stream.scaladsl.Source
settings) {
override implicit def executionContext: ExecutionContext =
system.executionContext
- override val logger: LoggingAdapter = Logging(system.classicSystem,
this.getClass)
+ override val logger: LoggingAdapter = Logging(system.classicSystem,
classOf[CassandraInternalProjectionState])
private val offsetStore = new CassandraOffsetStore(system)
@@ -203,7 +203,8 @@ import pekko.stream.scaladsl.Source
offsetStore.saveOffset(projectionId, offset)
private[projection] def newRunningInstance(): RunningProjection = {
- new CassandraRunningProjection(RunningProjection.withBackoff(() =>
mappedSource(), settings), offsetStore, this)
+ new CassandraRunningProjection(RunningProjection.withBackoff(() =>
this.mappedSource(), settings), offsetStore,
+ this)
}
}
diff --git
a/core-test/src/test/scala/org/apache/pekko/projection/internal/HandlerRecoveryImplSpec.scala
b/core-test/src/test/scala/org/apache/pekko/projection/internal/HandlerRecoveryImplSpec.scala
index deac0c0..50a2e8d 100644
---
a/core-test/src/test/scala/org/apache/pekko/projection/internal/HandlerRecoveryImplSpec.scala
+++
b/core-test/src/test/scala/org/apache/pekko/projection/internal/HandlerRecoveryImplSpec.scala
@@ -55,7 +55,7 @@ class HandlerRecoveryImplSpec extends
ScalaTestWithActorTestKit with AnyWordSpec
import HandlerRecoveryImplSpec._
import TestStatusObserver._
- private val logger = Logging(system.toClassic, getClass)
+ private val logger = Logging(system.toClassic,
classOf[HandlerRecoveryImplSpec])
private val failOnOffset: Long = 3
private val env3 = Envelope(offset = failOnOffset, "c")
private val projectionId = ProjectionId("test", "1")
diff --git
a/core-test/src/test/scala/org/apache/pekko/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala
b/core-test/src/test/scala/org/apache/pekko/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala
index 3a4d423..c15f4a5 100644
---
a/core-test/src/test/scala/org/apache/pekko/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala
+++
b/core-test/src/test/scala/org/apache/pekko/projection/internal/metrics/tools/InternalProjectionStateMetricsSpec.scala
@@ -148,25 +148,27 @@ object InternalProjectionStateMetricsSpec {
val adaptedHandlerStrategy: HandlerStrategy = offsetStrategy match {
case ExactlyOnce(_) =>
handlerStrategy match {
- case SingleHandlerStrategy(handlerFactory) => {
+ case singleHandlerStrategy: SingleHandlerStrategy[Envelope]
@unchecked => {
val adaptedHandler = () =>
new Handler[Envelope] {
- override def process(envelope: Envelope): Future[Done] =
handlerFactory().process(envelope).flatMap {
- _ =>
- offsetStore.saveOffset(projectionId, envelope.offset)
- }
+ override def process(envelope: Envelope): Future[Done] =
+
singleHandlerStrategy.handlerFactory().process(envelope).flatMap {
+ _ =>
+ offsetStore.saveOffset(projectionId, envelope.offset)
+ }
}
SingleHandlerStrategy(adaptedHandler)
}
- case GroupedHandlerStrategy(handlerFactory, afterEnvelopes,
orAfterDuration) => {
+ case groupedHandlerStrategy: GroupedHandlerStrategy[Envelope]
@unchecked => {
val adaptedHandler = () =>
new Handler[immutable.Seq[Envelope]] {
override def process(envelopes: immutable.Seq[Envelope]):
Future[Done] =
- handlerFactory().process(envelopes).flatMap { _ =>
+
groupedHandlerStrategy.handlerFactory().process(envelopes).flatMap { _ =>
offsetStore.saveOffset(projectionId, envelopes.last.offset)
}
}
- GroupedHandlerStrategy(adaptedHandler, afterEnvelopes,
orAfterDuration)
+ GroupedHandlerStrategy(adaptedHandler,
groupedHandlerStrategy.afterEnvelopes,
+ groupedHandlerStrategy.orAfterDuration)
}
case FlowHandlerStrategy(_) => handlerStrategy
}
@@ -202,7 +204,8 @@ object InternalProjectionStateMetricsSpec {
handlerStrategy,
statusObserver,
settings) {
- override def logger: LoggingAdapter = Logging(system.classicSystem,
this.getClass)
+ override def logger: LoggingAdapter =
+ Logging(system.classicSystem,
classOf[InMemInternalProjectionState[Offset, Env]])
override implicit def executionContext: ExecutionContext =
system.executionContext
diff --git
a/core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala
b/core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala
index f78d3f4..0544e05 100644
--- a/core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala
+++ b/core/src/main/scala/org/apache/pekko/projection/ProjectionBehavior.scala
@@ -13,22 +13,13 @@
package org.apache.pekko.projection
-import org.apache.pekko
-import pekko.actor.typed.scaladsl.LoggerOps
-import scala.util.Failure
-import scala.util.Success
+import scala.util.{ Failure, Success }
+import org.apache.pekko
import pekko.Done
-import pekko.actor.typed.ActorRef
-import pekko.actor.typed.Behavior
-import pekko.actor.typed.PostStop
-import pekko.actor.typed.PreRestart
-import pekko.actor.typed.SupervisorStrategy
-import pekko.actor.typed.scaladsl.ActorContext
-import pekko.actor.typed.scaladsl.Behaviors
-import pekko.actor.typed.scaladsl.StashBuffer
-import pekko.annotation.ApiMayChange
-import pekko.annotation.InternalApi
+import pekko.actor.typed.{ ActorRef, Behavior, PostStop, PreRestart,
SupervisorStrategy }
+import pekko.actor.typed.scaladsl.{ ActorContext, Behaviors, LoggerOps,
StashBuffer }
+import pekko.annotation.{ ApiMayChange, InternalApi }
import pekko.projection.internal.ManagementState
import pekko.projection.scaladsl.ProjectionManagement
@@ -96,7 +87,7 @@ object ProjectionBehavior {
val running = projection.run()(ctx.system)
if (running.isInstanceOf[RunningProjectionManagement[_]])
ProjectionManagement(ctx.system).register(projection.projectionId,
ctx.self)
- new ProjectionBehavior(ctx, projection, stashBuffer).started(running)
+ new ProjectionBehavior[Any, Envelope](ctx, projection,
stashBuffer).started(running)
}
}
}
diff --git a/core/src/main/scala/org/apache/pekko/projection/ProjectionId.scala
b/core/src/main/scala/org/apache/pekko/projection/ProjectionId.scala
index 584ce31..c402e4b 100644
--- a/core/src/main/scala/org/apache/pekko/projection/ProjectionId.scala
+++ b/core/src/main/scala/org/apache/pekko/projection/ProjectionId.scala
@@ -94,7 +94,7 @@ object ProjectionId {
* @return an [[java.util.Set]] of [[ProjectionId]]s
*/
def of(name: String, keys: java.util.Set[String]):
java.util.Set[ProjectionId] =
- keys.asScala.map { key: String => new ProjectionId(name, key) }.asJava
+ keys.asScala.map { key => new ProjectionId(name, key) }.asJava
}
@ApiMayChange
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
index d59b03e..cebc2d2 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/InternalProjectionState.scala
@@ -270,7 +270,7 @@ private[projection] abstract class
InternalProjectionState[Offset, Envelope](
}
sourceProvider match {
- case _: MergeableOffsetSourceProvider[Offset, Envelope] =>
+ case _: MergeableOffsetSourceProvider[_, _] =>
val batches = envelopesAndOffsets
.flatMap {
case context @ ProjectionContextImpl(offset: MergeableOffset[_]
@unchecked, _, _, _) =>
diff --git
a/core/src/main/scala/org/apache/pekko/projection/internal/ProjectionContextImpl.scala
b/core/src/main/scala/org/apache/pekko/projection/internal/ProjectionContextImpl.scala
index 7e29111..58e49ea 100644
---
a/core/src/main/scala/org/apache/pekko/projection/internal/ProjectionContextImpl.scala
+++
b/core/src/main/scala/org/apache/pekko/projection/internal/ProjectionContextImpl.scala
@@ -22,7 +22,7 @@ import pekko.projection.ProjectionContext
* @param groupSize is used only in GroupHandlerStrategies so a single context
instance
* can report that multiple envelopes were processed.
*/
-@InternalApi private[projection] case class ProjectionContextImpl[Offset,
Envelope] private (
+@InternalApi private[projection] case class ProjectionContextImpl[Offset,
Envelope](
offset: Offset,
envelope: Envelope,
externalContext: AnyRef,
diff --git
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
index 6634d9f..7fbfd1a 100644
---
a/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
+++
b/jdbc/src/main/scala/org/apache/pekko/projection/jdbc/internal/JdbcProjectionImpl.scala
@@ -259,7 +259,7 @@ private[projection] class JdbcProjectionImpl[Offset,
Envelope, S <: JdbcSession]
settings) {
implicit val executionContext: ExecutionContext = system.executionContext
- override val logger: LoggingAdapter = Logging(system.classicSystem,
this.getClass)
+ override val logger: LoggingAdapter = Logging(system.classicSystem,
classOf[JdbcInternalProjectionState])
override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
@@ -271,7 +271,7 @@ private[projection] class JdbcProjectionImpl[Offset,
Envelope, S <: JdbcSession]
offsetStore.saveOffset(projectionId, offset)
private[projection] def newRunningInstance(): RunningProjection =
- new JdbcRunningProjection(RunningProjection.withBackoff(() =>
mappedSource(), settings), this)
+ new JdbcRunningProjection(RunningProjection.withBackoff(() =>
this.mappedSource(), settings), this)
}
private class JdbcRunningProjection(source: Source[Done, _],
projectionState: JdbcInternalProjectionState)(
diff --git a/project/Common.scala b/project/Common.scala
index ce1d546..679173b 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -41,7 +41,7 @@ object Common extends AutoPlugin {
override lazy val projectSettings = Seq(
projectInfoVersion := (if (isSnapshot.value) "snapshot" else
version.value),
crossVersion := CrossVersion.binary,
- crossScalaVersions := Dependencies.ScalaVersions,
+ crossScalaVersions := Dependencies.Scala2Versions,
scalaVersion := Dependencies.Scala213,
javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
@@ -54,10 +54,11 @@ object Common extends AutoPlugin {
"-doc-source-url", {
val branch = if (isSnapshot.value) "main" else s"v${version.value}"
s"https://github.com/apache/incubator-pekko-projection/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
- },
- "-skip-packages",
- "org.apache.pekko.pattern" // for some reason Scaladoc creates this
- ),
+ }) ++ (if (scalaBinaryVersion.value.startsWith("3")) {
+ Seq("-skip-packages:org.apache.pekko.pattern")
+ } else {
+ Seq("-skip-packages", "org.apache.pekko.pattern")
+ }),
autoAPIMappings := true,
apiURL :=
Some(url(s"https://pekko.apache.org/api/pekko-projection/${projectInfoVersion.value}")),
// show full stack traces and test case durations
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 3f1c9f8..5792442 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -16,7 +16,9 @@ object Dependencies {
val Scala213 = "2.13.11"
val Scala212 = "2.12.18"
- val ScalaVersions = Seq(Scala213, Scala212)
+ val Scala3 = "3.3.0"
+ val Scala2Versions = Seq(Scala213, Scala212)
+ val Scala2And3Versions = Scala2Versions.+:(Scala3)
val PekkoVersionInDocs = "1.0.1"
val ConnectorsVersionInDocs = "0.0.0+173-c12dde2b-SNAPSHOT"
diff --git
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestProjectionImpl.scala
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestProjectionImpl.scala
index 20997cf..eb27431 100644
---
a/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestProjectionImpl.scala
+++
b/testkit/src/main/scala/org/apache/pekko/projection/testkit/internal/TestProjectionImpl.scala
@@ -167,7 +167,8 @@ private[projection] class
TestInternalProjectionState[Offset, Envelope](
startOffset.foreach(offset => offsetStore.saveOffset(projectionId, offset))
- override val logger: LoggingAdapter = Logging(system.classicSystem,
this.getClass)
+ override val logger: LoggingAdapter =
+ Logging(system.classicSystem, classOf[TestInternalProjectionState[Offset,
Envelope]])
override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]