This is an automated email from the ASF dual-hosted git repository.
engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new ae5bdad drop scala 2.12 (#242)
ae5bdad is described below
commit ae5bdad7bea916aded92f492c1befaf4c3f15c92
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Sep 27 12:40:31 2025 +0100
drop scala 2.12 (#242)
---
.github/workflows/build-test.yml | 11 ++++-------
.../r2dbc/ConnectionFactoryProvider.scala | 3 ++-
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 21 +++++++++++----------
.../r2dbc/internal/ContinuousQuery.scala | 9 +++------
.../r2dbc/internal/HighestSequenceNrDao.scala | 6 ++----
.../persistence/r2dbc/internal/R2dbcExecutor.scala | 9 ++++-----
.../persistence/r2dbc/journal/JournalDao.scala | 13 ++++++-------
.../persistence/r2dbc/journal/R2dbcJournal.scala | 15 ++++++---------
.../r2dbc/query/javadsl/R2dbcReadJournal.scala | 11 ++++++-----
.../persistence/r2dbc/snapshot/SnapshotDao.scala | 11 +++++------
.../state/javadsl/R2dbcDurableStateStore.scala | 6 +++---
.../r2dbc/state/scaladsl/DurableStateDao.scala | 10 ++++------
.../state/scaladsl/R2dbcDurableStateStore.scala | 7 +++----
.../persistence/r2dbc/migration/MigrationTool.scala | 10 ++++------
.../r2dbc/migration/MigrationToolDao.scala | 8 +++-----
project/CommonSettings.scala | 2 +-
project/Dependencies.scala | 1 -
.../projection/r2dbc/R2dbcProjectionSettings.scala | 5 +++--
.../internal/BySliceSourceProviderAdapter.scala | 21 +++++++++------------
.../r2dbc/internal/R2dbcHandlerAdapter.scala | 4 ++--
.../r2dbc/internal/R2dbcOffsetStore.scala | 21 ++++++++++-----------
.../r2dbc/internal/R2dbcProjectionImpl.scala | 4 ++--
.../projection/r2dbc/javadsl/R2dbcProjection.scala | 3 ++-
.../projection/r2dbc/javadsl/R2dbcSession.scala | 11 +++++------
.../r2dbc/TestSourceProviderWithInput.scala | 2 +-
25 files changed, 101 insertions(+), 123 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index a809c01..49fadd6 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
+ SCALA_VERSION: [ 2.13, 3.3 ]
JAVA_VERSION: [ 8, 11, 17, 21 ]
steps:
- name: Checkout
@@ -45,7 +45,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
+ SCALA_VERSION: [ 2.13, 3.3 ]
JAVA_VERSION: [ 8, 11 ]
if: github.repository == 'apache/pekko-persistence-r2dbc'
steps:
@@ -95,7 +95,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
+ SCALA_VERSION: [ 2.13, 3.3 ]
JAVA_VERSION: [ 8, 11 ]
if: github.repository == 'apache/pekko-persistence-r2dbc'
steps:
@@ -145,13 +145,10 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
+ SCALA_VERSION: [ 2.13, 3.3 ]
JAVA_VERSION: [ 11, 17, 21 ]
# only compiling on JDK 8, because certain tests depend on the higher
timestamp precision added in JDK 9
include:
- - JAVA_VERSION: 8
- SCALA_VERSION: 2.12
- COMPILE_ONLY: true
- JAVA_VERSION: 8
SCALA_VERSION: 2.13
COMPILE_ONLY: true
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
index b6a5e5d..ce3ea63 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
@@ -18,8 +18,10 @@ import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.concurrent.duration.Duration
+import scala.jdk.CollectionConverters._
import scala.util.Failure
import scala.util.Success
+
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
@@ -29,7 +31,6 @@ import pekko.actor.typed.ExtensionId
import
pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
-import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import io.r2dbc.pool.ConnectionPool
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 9a67a80..00f43b3 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -16,11 +16,12 @@ package org.apache.pekko.persistence.r2dbc
import java.util.Locale
import scala.concurrent.duration._
+import scala.jdk.DurationConverters._
+
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.annotation.InternalStableApi
import pekko.util.Helpers.toRootLowerCase
-import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config
/**
@@ -151,7 +152,7 @@ trait ConnectionSettings {
val logDbCallsExceeding: FiniteDuration =
config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match {
case "off" => -1.millis
- case _ => config.getDuration("log-db-calls-exceeding").asScala
+ case _ => config.getDuration("log-db-calls-exceeding").toScala
}
}
@@ -205,7 +206,7 @@ trait BufferSize {
trait RefreshInterval {
def config: Config
- val refreshInterval: FiniteDuration =
config.getDuration("refresh-interval").asScala
+ val refreshInterval: FiniteDuration =
config.getDuration("refresh-interval").toScala
}
/**
@@ -215,10 +216,10 @@ trait RefreshInterval {
trait BySliceQuerySettings {
def config: Config
- val behindCurrentTime: FiniteDuration =
config.getDuration("behind-current-time").asScala
+ val behindCurrentTime: FiniteDuration =
config.getDuration("behind-current-time").toScala
val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled")
- val backtrackingWindow: FiniteDuration =
config.getDuration("backtracking.window").asScala
- val backtrackingBehindCurrentTime: FiniteDuration =
config.getDuration("backtracking.behind-current-time").asScala
+ val backtrackingWindow: FiniteDuration =
config.getDuration("backtracking.window").toScala
+ val backtrackingBehindCurrentTime: FiniteDuration =
config.getDuration("backtracking.behind-current-time").toScala
}
/**
@@ -254,11 +255,11 @@ final class ConnectionFactorySettings(config: Config) {
val initialSize: Int = config.getInt("initial-size")
val maxSize: Int = config.getInt("max-size")
- val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").asScala
- val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").asScala
+ val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").toScala
+ val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").toScala
- val connectTimeout: FiniteDuration =
config.getDuration("connect-timeout").asScala
- val acquireTimeout: FiniteDuration =
config.getDuration("acquire-timeout").asScala
+ val connectTimeout: FiniteDuration =
config.getDuration("connect-timeout").toScala
+ val acquireTimeout: FiniteDuration =
config.getDuration("acquire-timeout").toScala
val acquireRetry: Int = config.getInt("acquire-retry")
val validationQuery: String = config.getString("validation-query")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala
index 422a1e7..b51055c 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ContinuousQuery.scala
@@ -13,16 +13,13 @@
package org.apache.pekko.persistence.r2dbc.internal
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
+import scala.util.{ Failure, Success, Try }
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.stream.Attributes
import pekko.stream.Outlet
import pekko.stream.SourceShape
@@ -127,7 +124,7 @@ final private[r2dbc] class ContinuousQuery[S, T](
beforeQuery(state) match {
case None => runNextQuery()
case Some(beforeQueryFuture) =>
-
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic)
+
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContext.parasitic)
}
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
index b1aa52f..6146e05 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
@@ -15,13 +15,11 @@ package org.apache.pekko.persistence.r2dbc.internal
import org.apache.pekko
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import org.slf4j.LoggerFactory
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
/**
* INTERNAL API
@@ -63,7 +61,7 @@ trait HighestSequenceNrDao {
val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
if (seqNr eq null) 0L else seqNr.longValue
})
- .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
+ .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic)
if (log.isDebugEnabled)
result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId
[{}]: [{}]", persistenceId, seqNr))
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
index 138584b..55dd88a 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
@@ -26,7 +26,6 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalStableApi
-import pekko.dispatch.ExecutionContexts
import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.Result
@@ -58,7 +57,7 @@ import reactor.core.publisher.Mono
def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext):
Future[Long] =
stmt.execute().asFuture().flatMap { result =>
-
result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic)
+
result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContext.parasitic)
}
def updateBatchInTx(stmt: Statement)(implicit ec: ExecutionContext):
Future[Long] = {
@@ -76,7 +75,7 @@ import reactor.core.publisher.Mono
statements.foldLeft(Future.successful(immutable.IndexedSeq.empty[Long])) {
(acc, stmt) =>
acc.flatMap { seq =>
stmt.execute().asFuture().flatMap { res =>
- res.getRowsUpdated.asFuture().map(seq :+
_.longValue())(ExecutionContexts.parasitic)
+ res.getRowsUpdated.asFuture().map(seq :+
_.longValue())(ExecutionContext.parasitic)
}
}
}
@@ -134,7 +133,7 @@ class R2dbcExecutor(val connectionFactory:
ConnectionFactory, log: Logger, logDb
if (durationMicros >= logDbCallsExceedingMicros)
log.info("{} - getConnection took [{}] µs", logPrefix,
durationMicros)
connection
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
}
/**
@@ -217,7 +216,7 @@ class R2dbcExecutor(val connectionFactory:
ConnectionFactory, log: Logger, logDb
def updateInBatchReturning[A](logPrefix: String)(
statementFactory: Connection => Statement,
mapRow: Row => A): Future[immutable.IndexedSeq[A]] = {
- import pekko.util.ccompat.JavaConverters._
+ import scala.jdk.CollectionConverters._
withConnection(logPrefix) { connection =>
val stmt = statementFactory(connection)
Flux
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index c06516d..1e5fcfb 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -15,12 +15,11 @@ package org.apache.pekko.persistence.r2dbc.journal
import java.time.Instant
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
+
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
@@ -237,7 +236,7 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
if (useTimestampFromDb) {
result
} else {
- result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
+ result.map(_ => events.head.dbTimestamp)(ExecutionContext.parasitic)
}
} else {
val result = r2dbcExecutor.updateInBatchReturning(s"batch insert
[$persistenceId], [$totalEvents] events")(
@@ -254,9 +253,9 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
log.debug("Wrote [{}] events for persistenceId [{}]", 1,
events.head.persistenceId)
}
if (useTimestampFromDb) {
- result.map(_.head)(ExecutionContexts.parasitic)
+ result.map(_.head)(ExecutionContext.parasitic)
} else {
- result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
+ result.map(_ => events.head.dbTimestamp)(ExecutionContext.parasitic)
}
}
}
@@ -299,7 +298,7 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
result.foreach(updatedRows =>
log.debug("Deleted [{}] events for persistenceId [{}]",
updatedRows.head, persistenceId))
- result.map(_ => ())(ExecutionContexts.parasitic)
+ result.map(_ => ())(ExecutionContext.parasitic)
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index 200d43a..c2fdd35 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -16,11 +16,9 @@ package org.apache.pekko.persistence.r2dbc.journal
import java.time.Instant
import scala.collection.immutable
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success, Try }
+
import com.typesafe.config.Config
import org.apache.pekko
import pekko.Done
@@ -28,7 +26,6 @@ import pekko.actor.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.event.Logging
import pekko.persistence.AtomicWrite
import pekko.persistence.Persistence
@@ -181,7 +178,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
writeAndPublishResult.onComplete { _ =>
self ! WriteFinished(persistenceId, writeAndPublishResult)
}
- writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic)
+ writeAndPublishResult.map(_ => Nil)(ExecutionContext.parasitic)
}
private def publish(messages: immutable.Seq[AtomicWrite], dbTimestamp:
Future[Instant]): Future[Done] =
@@ -196,7 +193,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
}
case None =>
- dbTimestamp.map(_ => Done)(ExecutionContexts.parasitic)
+ dbTimestamp.map(_ => Done)(ExecutionContext.parasitic)
}
private def logEventsByTagsNotImplemented(): Unit = {
@@ -236,7 +233,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
case Some(f) =>
log.debug("Write in progress for [{}], deferring highest seq nr until
write completed", persistenceId)
// we only want to make write - replay sequential, not fail if
previous write failed
- f.recover { case _ => Done }(ExecutionContexts.parasitic)
+ f.recover { case _ => Done }(ExecutionContext.parasitic)
case None => Future.successful(Done)
}
pendingWrite.flatMap(_ => journalDao.readHighestSequenceNr(persistenceId,
fromSequenceNr))
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
index f45f91b..172f855 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala
@@ -18,9 +18,12 @@ import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage
+import scala.concurrent.ExecutionContext
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
+
import org.apache.pekko
import pekko.NotUsed
-import pekko.dispatch.ExecutionContexts
import pekko.japi.Pair
import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
import pekko.persistence.query.Offset
@@ -32,8 +35,6 @@ import
pekko.persistence.query.typed.javadsl.EventsBySliceQuery
import pekko.persistence.query.typed.javadsl.LoadEventQuery
import pekko.persistence.r2dbc.query.scaladsl
import pekko.stream.javadsl.Source
-import pekko.util.OptionConverters._
-import pekko.util.FutureConverters._
object R2dbcReadJournal {
val Identifier: String = scaladsl.R2dbcReadJournal.Identifier
@@ -68,7 +69,7 @@ final class R2dbcReadJournal(delegate:
scaladsl.R2dbcReadJournal)
delegate.eventsBySlices(entityType, minSlice, maxSlice, offset).asJava
override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer,
Integer]] = {
- import pekko.util.ccompat.JavaConverters._
+ import scala.jdk.CollectionConverters._
delegate
.sliceRanges(numberOfRanges)
.map(range => Pair(Integer.valueOf(range.min),
Integer.valueOf(range.max)))
@@ -94,7 +95,7 @@ final class R2dbcReadJournal(delegate:
scaladsl.R2dbcReadJournal)
delegate.currentPersistenceIds(afterId.toScala, limit).asJava
override def timestampOf(persistenceId: String, sequenceNr: Long):
CompletionStage[Optional[Instant]] =
- delegate.timestampOf(persistenceId,
sequenceNr).map(_.toJava)(ExecutionContexts.parasitic).asJava
+ delegate.timestampOf(persistenceId,
sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long):
CompletionStage[EventEnvelope[Event]] =
delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index bc3bb94..7cf77dc 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -13,12 +13,11 @@
package org.apache.pekko.persistence.r2dbc.snapshot
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
+
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.SnapshotSelectionCriteria
import pekko.persistence.r2dbc.ConnectionFactoryProvider
@@ -193,7 +192,7 @@ private[r2dbc] class SnapshotDao(settings:
SnapshotSettings, connectionFactory:
statement
},
collectSerializedSnapshot)
- .map(_.headOption)(ExecutionContexts.parasitic)
+ .map(_.headOption)(ExecutionContext.parasitic)
}
@@ -231,7 +230,7 @@ private[r2dbc] class SnapshotDao(settings:
SnapshotSettings, connectionFactory:
statement
}
- .map(_ => ())(ExecutionContexts.parasitic)
+ .map(_ => ())(ExecutionContext.parasitic)
}
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria):
Future[Unit] = {
@@ -262,6 +261,6 @@ private[r2dbc] class SnapshotDao(settings:
SnapshotSettings, connectionFactory:
}
statement
}
- }.map(_ => ())(ExecutionContexts.parasitic)
+ }.map(_ => ())(ExecutionContext.parasitic)
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
index 709efdb..8770f85 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala
@@ -18,6 +18,7 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.ExecutionContext
+import scala.jdk.FutureConverters._
import org.apache.pekko
import pekko.Done
@@ -31,7 +32,6 @@ import pekko.persistence.r2dbc.state.scaladsl.{
R2dbcDurableStateStore => ScalaR
import pekko.persistence.state.javadsl.DurableStateUpdateStore
import pekko.persistence.state.javadsl.GetObjectResult
import pekko.stream.javadsl.Source
-import pekko.util.FutureConverters._
object R2dbcDurableStateStore {
val Identifier: String = ScalaR2dbcDurableStateStore.Identifier
@@ -75,7 +75,7 @@ class R2dbcDurableStateStore[A](scalaStore:
ScalaR2dbcDurableStateStore[A])(impl
scalaStore.sliceForPersistenceId(persistenceId)
override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer,
Integer]] = {
- import pekko.util.ccompat.JavaConverters._
+ import scala.jdk.CollectionConverters._
scalaStore
.sliceRanges(numberOfRanges)
.map(range => Pair(Integer.valueOf(range.min),
Integer.valueOf(range.max)))
@@ -83,7 +83,7 @@ class R2dbcDurableStateStore[A](scalaStore:
ScalaR2dbcDurableStateStore[A])(impl
}
override def currentPersistenceIds(afterId: Optional[String], limit: Long):
Source[String, NotUsed] = {
- import pekko.util.OptionConverters._
+ import scala.jdk.OptionConverters._
scalaStore.currentPersistenceIds(afterId.toScala, limit).asJava
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 2c5d454..415b7fe 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -15,16 +15,14 @@ package org.apache.pekko.persistence.r2dbc.state.scaladsl
import java.time.Instant
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.concurrent.duration.{ Duration, FiniteDuration }
+
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
@@ -302,7 +300,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
if (log.isDebugEnabled())
result.foreach(_ => log.debug("Deleted durable state for persistenceId
[{}]", persistenceId))
- result.map(_ => Done)(ExecutionContexts.parasitic)
+ result.map(_ => Done)(ExecutionContext.parasitic)
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index ab30892..d6c43aa 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -14,8 +14,8 @@
package org.apache.pekko.persistence.r2dbc.state.scaladsl
import scala.collection.immutable
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
+
import com.typesafe.config.Config
import org.apache.pekko
import pekko.Done
@@ -23,7 +23,6 @@ import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
@@ -125,7 +124,7 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
.foreach(throw _)
}
Done
- }(ExecutionContexts.parasitic)
+ }(ExecutionContext.parasitic)
}
override def sliceForPersistenceId(persistenceId: String): Int =
diff --git
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
index 055138c..dddbfd3 100644
---
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
+++
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
@@ -15,17 +15,15 @@ package org.apache.pekko.persistence.r2dbc.migration
import java.time.Instant
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
+import scala.util.{ Failure, Success, Try }
+
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
-import pekko.dispatch.ExecutionContexts
import pekko.pattern.ask
import pekko.persistence.Persistence
import pekko.persistence.SelectedSnapshot
@@ -296,7 +294,7 @@ class MigrationTool(system: ActorSystem[_]) {
val serializedRow = serializedSnapotRow(selectedSnapshot)
targetSnapshotDao
.store(serializedRow)
- .map(_ =>
snapshotMetadata.sequenceNr)(ExecutionContexts.parasitic)
+ .map(_ =>
snapshotMetadata.sequenceNr)(ExecutionContext.parasitic)
}
_ <- migrationDao.updateSnapshotProgress(persistenceId, seqNr)
} yield 1
diff --git
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala
index aed6d03..9b6c500 100644
---
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala
+++
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationToolDao.scala
@@ -13,15 +13,13 @@
package org.apache.pekko.persistence.r2dbc.migration
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.journal.JournalDao.log
@@ -70,7 +68,7 @@ import io.r2dbc.spi.ConnectionFactory
.bind(0, persistenceId)
.bind(1, seqNr)
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
}
def updateSnapshotProgress(persistenceId: String, seqNr: Long): Future[Done]
= {
@@ -87,7 +85,7 @@ import io.r2dbc.spi.ConnectionFactory
.bind(0, persistenceId)
.bind(1, seqNr)
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
}
def currentProgress(persistenceId: String): Future[Option[CurrentProgress]]
= {
diff --git a/project/CommonSettings.scala b/project/CommonSettings.scala
index 2408741..6195661 100644
--- a/project/CommonSettings.scala
+++ b/project/CommonSettings.scala
@@ -21,7 +21,7 @@ object CommonSettings extends AutoPlugin {
override def requires = JvmPlugin && ApacheSonatypePlugin && DynVerPlugin
override lazy val projectSettings = Seq(
- crossScalaVersions := Seq(Dependencies.Scala212, Dependencies.Scala213,
Dependencies.Scala3),
+ crossScalaVersions := Seq(Dependencies.Scala213, Dependencies.Scala3),
scalaVersion := Dependencies.Scala213,
crossVersion := CrossVersion.binary,
// Setting javac options in common allows IntelliJ IDEA to import them
automatically
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index af6c233..42f5863 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -14,7 +14,6 @@
import sbt._
object Dependencies {
- val Scala212 = "2.12.20"
val Scala213 = "2.13.16"
val Scala3 = "3.3.6"
val PekkoVersion = PekkoCoreDependency.version
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
index 70bc11b..15eb323 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
@@ -17,12 +17,13 @@ import java.time.{ Duration => JDuration }
import java.util.Locale
import scala.concurrent.duration._
+import scala.jdk.DurationConverters._
import scala.util.hashing.MurmurHash3
+
import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.Dialect
-import pekko.util.JavaDurationConverters._
object R2dbcProjectionSettings {
@@ -32,7 +33,7 @@ object R2dbcProjectionSettings {
val logDbCallsExceeding: FiniteDuration =
config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT)
match {
case "off" => -1.millis
- case _ => config.getDuration("log-db-calls-exceeding").asScala
+ case _ => config.getDuration("log-db-calls-exceeding").toScala
}
new R2dbcProjectionSettings(
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala
index 1c4b190..d0b9383 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/BySliceSourceProviderAdapter.scala
@@ -18,24 +18,21 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
-import pekko.projection.javadsl
-import pekko.projection.scaladsl
-import pekko.dispatch.ExecutionContexts
-import pekko.stream.scaladsl.Source
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
-import pekko.util.OptionConverters._
-import scala.concurrent.ExecutionContext
-
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
+import pekko.projection.javadsl
+import pekko.projection.scaladsl
import pekko.projection.BySlicesSourceProvider
+import pekko.stream.scaladsl.Source
/**
* INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
@@ -50,7 +47,7 @@ import pekko.projection.BySlicesSourceProvider
def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope,
NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a
java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets
themselves
- val ec = pekko.dispatch.ExecutionContexts.parasitic
+ val ec = ExecutionContext.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] =
offset().map(_.toJava)(ec).asJava
}
@@ -70,7 +67,7 @@ import pekko.projection.BySlicesSourceProvider
override def timestampOf(persistenceId: String, sequenceNr: Long):
Future[Option[Instant]] =
delegate match {
case timestampQuery:
pekko.persistence.query.typed.javadsl.EventTimestampQuery =>
- timestampQuery.timestampOf(persistenceId,
sequenceNr).asScala.map(_.toScala)(ExecutionContexts.parasitic)
+ timestampQuery.timestampOf(persistenceId,
sequenceNr).asScala.map(_.toScala)(ExecutionContext.parasitic)
case _ =>
Future.failed(
new IllegalArgumentException(
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala
index de71c82..c64a1ed 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcHandlerAdapter.scala
@@ -15,6 +15,8 @@ package org.apache.pekko.projection.r2dbc.internal
import scala.collection.immutable
import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
import org.apache.pekko
import pekko.Done
@@ -22,8 +24,6 @@ import pekko.annotation.InternalApi
import pekko.projection.r2dbc.javadsl
import pekko.projection.r2dbc.javadsl.R2dbcSession
import pekko.projection.r2dbc.scaladsl
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
/**
* INTERNAL API: Adapter from javadsl.R2dbcHandler to scaladsl.R2dbcHandler
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 3c612ee..e2515b7 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -22,14 +22,13 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
+import scala.concurrent.{ ExecutionContext, Future }
+
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.annotation.InternalApi
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
@@ -291,8 +290,8 @@ private[projection] class R2dbcOffsetStore(
case timestampQuery: EventTimestampQuery =>
timestampQuery.timestampOf(persistenceId, sequenceNr)
case timestampQuery:
pekko.persistence.query.typed.javadsl.EventTimestampQuery =>
- import pekko.util.FutureConverters._
- import pekko.util.OptionConverters._
+ import scala.jdk.FutureConverters._
+ import scala.jdk.OptionConverters._
timestampQuery.timestampOf(persistenceId,
sequenceNr).asScala.map(_.toScala)
case _ =>
throw new IllegalArgumentException(
@@ -418,7 +417,7 @@ private[projection] class R2dbcOffsetStore(
.withConnection("save offset") { conn =>
saveOffsetInTx(conn, offset)
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
}
/**
@@ -440,7 +439,7 @@ private[projection] class R2dbcOffsetStore(
.withConnection("save offsets") { conn =>
saveOffsetsInTx(conn, offsets)
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
}
def saveOffsetsInTx[Offset](conn: Connection, offsets:
immutable.IndexedSeq[Offset]): Future[Done] = {
@@ -604,7 +603,7 @@ private[projection] class R2dbcOffsetStore(
case MultipleOffsets(many) => many.map(upsertStmt).toVector
}
- R2dbcExecutor.updateInTx(statements).map(_ =>
Done)(ExecutionContexts.parasitic)
+ R2dbcExecutor.updateInTx(statements).map(_ =>
Done)(ExecutionContext.parasitic)
}
def isDuplicate(record: Record): Boolean =
@@ -884,14 +883,14 @@ private[projection] class R2dbcOffsetStore(
insertTimestampOffsetInTx(conn, records)
}
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
case _ =>
r2dbcExecutor
.withConnection("set offset") { conn =>
savePrimitiveOffsetInTx(conn, offset)
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
}
}
@@ -1001,7 +1000,7 @@ private[projection] class R2dbcOffsetStore(
.bind(2, paused)
.bind(3, Instant.now(clock).toEpochMilli)
}
- .map(_ => Done)(ExecutionContexts.parasitic)
+ .map(_ => Done)(ExecutionContext.parasitic)
}
private def createRecordWithOffset[Envelope](envelope: Envelope):
Option[RecordWithOffset] = {
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 5452a92..4a96fd0 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -102,7 +102,7 @@ private[projection] object R2dbcProjectionImpl {
case loadEventQuery: LoadEventQuery =>
loadEventQuery.loadEnvelope[Any](pid, seqNr)
case loadEventQuery:
pekko.persistence.query.typed.javadsl.LoadEventQuery =>
- import pekko.util.FutureConverters._
+ import scala.jdk.FutureConverters._
loadEventQuery.loadEnvelope[Any](pid, seqNr).asScala
case _ =>
throw new IllegalArgumentException(
@@ -127,7 +127,7 @@ private[projection] object R2dbcProjectionImpl {
case store: DurableStateStore[_] =>
store.getObject(pid)
case store: pekko.persistence.state.javadsl.DurableStateStore[_] =>
- import pekko.util.FutureConverters._
+ import scala.jdk.FutureConverters._
store.getObject(pid).asScala.map(_.toScala)
}).map {
case GetObjectResult(Some(loadedValue), loadedRevision) =>
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala
index 007c54a..b70441c 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcProjection.scala
@@ -16,6 +16,8 @@ package org.apache.pekko.projection.r2dbc.javadsl
import java.util.Optional
import java.util.function.Supplier
+import scala.jdk.OptionConverters._
+
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
@@ -38,7 +40,6 @@ import
pekko.projection.r2dbc.internal.R2dbcGroupedHandlerAdapter
import pekko.projection.r2dbc.internal.R2dbcHandlerAdapter
import pekko.projection.r2dbc.scaladsl
import pekko.stream.javadsl.FlowWithContext
-import pekko.util.OptionConverters._
@ApiMayChange
object R2dbcProjection {
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala
index 079cb95..ee20292 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/javadsl/R2dbcSession.scala
@@ -17,15 +17,14 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.ExecutionContext
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.ApiMayChange
-import pekko.dispatch.ExecutionContexts
import pekko.persistence.r2dbc.internal.R2dbcExecutor
-import pekko.util.ccompat.JavaConverters._
-import pekko.util.FutureConverters._
-import pekko.util.OptionConverters._
import io.r2dbc.spi.Connection
import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
@@ -37,14 +36,14 @@ final class R2dbcSession(connection: Connection)(implicit
ec: ExecutionContext,
connection.createStatement(sql)
def updateOne(statement: Statement): CompletionStage[java.lang.Long] =
-
R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContexts.parasitic).asJava
+
R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContext.parasitic).asJava
def update(statements: java.util.List[Statement]):
CompletionStage[java.util.List[java.lang.Long]] =
R2dbcExecutor.updateInTx(statements.asScala.toVector).map(results =>
results.map(java.lang.Long.valueOf).asJava).asJava
def selectOne[A](statement: Statement)(mapRow: Row => A):
CompletionStage[Optional[A]] =
- R2dbcExecutor.selectOneInTx(statement,
mapRow).map(_.toJava)(ExecutionContexts.parasitic).asJava
+ R2dbcExecutor.selectOneInTx(statement,
mapRow).map(_.toJava)(ExecutionContext.parasitic).asJava
def select[A](statement: Statement)(mapRow: Row => A):
CompletionStage[java.util.List[A]] =
R2dbcExecutor.selectInTx(statement, mapRow).map(_.asJava).asJava
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala
index 8e6cf78..2817b76 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestSourceProviderWithInput.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
+import scala.jdk.CollectionConverters._
import org.apache.pekko
import pekko.NotUsed
@@ -35,7 +36,6 @@ import pekko.projection.BySlicesSourceProvider
import pekko.projection.scaladsl.SourceProvider
import pekko.stream.OverflowStrategy
import pekko.stream.scaladsl.Source
-import pekko.util.ccompat.JavaConverters._
class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
extends SourceProvider[TimestampOffset, EventEnvelope[String]]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]