This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new beee0d1 Projection implementation for MySQL support (#177)
beee0d1 is described below
commit beee0d1bfd0cd46969e8ab940cf8244e7f07b5b3
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Sat Nov 30 13:06:42 2024 +0200
Projection implementation for MySQL support (#177)
* Projection implementation for MySQL support
* Convert R2dbcProjectionSettings from case class to class
---------
Co-authored-by: PJ Fanning <[email protected]>
---
.github/workflows/build-test.yml | 2 +-
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 20 ++-
.../pekko/persistence/r2dbc/internal/Sql.scala | 2 +-
project/Dependencies.scala | 1 +
.../r2dbcprojectionsettings.excludes | 51 +++++++
projection/src/main/resources/reference.conf | 2 +-
.../projection/r2dbc/R2dbcProjectionSettings.scala | 160 ++++++++++++++++++---
.../r2dbc/internal/R2dbcOffsetStore.scala | 44 ++++--
.../r2dbc/internal/R2dbcProjectionImpl.scala | 2 +-
.../internal/mysql/MySQLR2dbcOffsetStore.scala | 66 +++++++++
.../projection/r2dbc/EventSourcedChaosSpec.scala | 2 +-
.../r2dbc/EventSourcedEndToEndSpec.scala | 7 +-
.../projection/r2dbc/EventSourcedPubSubSpec.scala | 2 +-
.../projection/r2dbc/R2dbcOffsetStoreSpec.scala | 7 +-
.../projection/r2dbc/R2dbcProjectionSpec.scala | 46 +++---
.../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala | 41 +++---
.../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 15 +-
.../apache/pekko/projection/r2dbc/TestConfig.scala | 15 ++
18 files changed, 393 insertions(+), 92 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 6e7d7cd..80f310f 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -185,7 +185,7 @@ jobs:
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
- name: test
- run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{
matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' ||
'test' }}
+ run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{
matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
test-docs:
name: Docs
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index dd8856f..5cbfbff 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -53,13 +53,7 @@ final class R2dbcSettings(config: Config) {
val durableStateAssertSingleWriter: Boolean =
config.getBoolean("state.assert-single-writer")
- val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
- case "yugabyte" => Dialect.Yugabyte
- case "postgres" => Dialect.Postgres
- case "mysql" => Dialect.MySQL
- case other =>
- throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported
dialects are [yugabyte, postgres].")
- }
+ val dialect: Dialect = Dialect.fromString(config.getString("dialect"))
val querySettings = new QuerySettings(config.getConfig("query"))
@@ -96,6 +90,18 @@ object Dialect {
/** @since 1.1.0 */
case object MySQL extends Dialect
+
+ /** @since 1.1.0 */
+ def fromString(value: String): Dialect = {
+ toRootLowerCase(value) match {
+ case "yugabyte" => Dialect.Yugabyte
+ case "postgres" => Dialect.Postgres
+ case "mysql" => Dialect.MySQL
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unknown dialect [$other]. Supported dialects are [yugabyte,
postgres, mysql].")
+ }
+ }
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
index affe56f..6fc0649 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
@@ -54,7 +54,7 @@ object Sql {
* INTERNAL API
*/
@InternalApi
- private[r2dbc] implicit class DialectInterpolation(val sc: StringContext)
extends AnyVal {
+ private[pekko] implicit class DialectInterpolation(val sc: StringContext)
extends AnyVal {
def sql(args: Any*)(implicit dialect: Dialect): String =
dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index b555938..6eebdc7 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -89,6 +89,7 @@ object Dependencies {
r2dbcSpi,
r2dbcPool,
r2dbcPostgres % "provided,test",
+ r2dbcMysql % "provided,test",
pekkoProjectionCore,
TestDeps.pekkoProjectionEventSourced,
TestDeps.pekkoProjectionDurableState,
diff --git
a/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes
b/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes
new file mode 100644
index 0000000..8b28128
--- /dev/null
+++
b/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Converting case class to class
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.canEqual")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productArity")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productPrefix")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElement")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementName")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementNames")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productIterator")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$3")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$4")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$5")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$6")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$7")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$8")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$9")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$10")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._3")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._4")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._5")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._6")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._7")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._8")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._9")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._10")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.fromProduct")
diff --git a/projection/src/main/resources/reference.conf
b/projection/src/main/resources/reference.conf
index cf8a83b..67ffe1d 100644
--- a/projection/src/main/resources/reference.conf
+++ b/projection/src/main/resources/reference.conf
@@ -5,7 +5,7 @@
//#projection-config
pekko.projection.r2dbc {
- # postgres or yugabyte
+ # postgres, yugabyte or mysql
dialect = ${pekko.persistence.r2dbc.dialect}
offset-store {
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
index a2423be..70bc11b 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
@@ -17,11 +17,12 @@ import java.time.{ Duration => JDuration }
import java.util.Locale
import scala.concurrent.duration._
-
+import scala.util.hashing.MurmurHash3
+import com.typesafe.config.Config
import org.apache.pekko
-import pekko.util.JavaDurationConverters._
import pekko.actor.typed.ActorSystem
-import com.typesafe.config.Config
+import pekko.persistence.r2dbc.Dialect
+import pekko.util.JavaDurationConverters._
object R2dbcProjectionSettings {
@@ -34,7 +35,8 @@ object R2dbcProjectionSettings {
case _ => config.getDuration("log-db-calls-exceeding").asScala
}
- R2dbcProjectionSettings(
+ new R2dbcProjectionSettings(
+ dialect = Dialect.fromString(config.getString("dialect")),
schema =
Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty),
offsetTable = config.getString("offset-store.offset-table"),
timestampOffsetTable =
config.getString("offset-store.timestamp-offset-table"),
@@ -44,25 +46,149 @@ object R2dbcProjectionSettings {
keepNumberOfEntries =
config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval = config.getDuration("offset-store.delete-interval"),
- logDbCallsExceeding)
+ logDbCallsExceeding
+ )
}
def apply(system: ActorSystem[_]): R2dbcProjectionSettings =
apply(system.settings.config.getConfig(DefaultConfigPath))
+
+ def apply(
+ schema: Option[String],
+ offsetTable: String,
+ timestampOffsetTable: String,
+ managementTable: String,
+ useConnectionFactory: String,
+ timeWindow: JDuration,
+ keepNumberOfEntries: Int,
+ evictInterval: JDuration,
+ deleteInterval: JDuration,
+ logDbCallsExceeding: FiniteDuration
+ ): R2dbcProjectionSettings = new R2dbcProjectionSettings(
+ Dialect.Postgres,
+ schema,
+ offsetTable,
+ timestampOffsetTable,
+ managementTable,
+ useConnectionFactory,
+ timeWindow,
+ keepNumberOfEntries,
+ evictInterval,
+ deleteInterval,
+ logDbCallsExceeding
+ )
}
-// FIXME remove case class, and add `with` methods
-final case class R2dbcProjectionSettings(
- schema: Option[String],
- offsetTable: String,
- timestampOffsetTable: String,
- managementTable: String,
- useConnectionFactory: String,
- timeWindow: JDuration,
- keepNumberOfEntries: Int,
- evictInterval: JDuration,
- deleteInterval: JDuration,
- logDbCallsExceeding: FiniteDuration) {
+final class R2dbcProjectionSettings private (
+ val dialect: Dialect,
+ val schema: Option[String],
+ val offsetTable: String,
+ val timestampOffsetTable: String,
+ val managementTable: String,
+ val useConnectionFactory: String,
+ val timeWindow: JDuration,
+ val keepNumberOfEntries: Int,
+ val evictInterval: JDuration,
+ val deleteInterval: JDuration,
+ val logDbCallsExceeding: FiniteDuration
+) extends Serializable {
+
+ override def toString: String =
+ s"R2dbcProjectionSettings($dialect, $schema, $offsetTable,
$timestampOffsetTable, $managementTable, " +
+ s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries,
$evictInterval, $deleteInterval, $logDbCallsExceeding)"
+
+ override def equals(other: Any): Boolean =
+ other match {
+ case that: R2dbcProjectionSettings =>
+ dialect == that.dialect && schema == that.schema &&
+ offsetTable == that.offsetTable && timestampOffsetTable ==
that.timestampOffsetTable &&
+ managementTable == that.managementTable && useConnectionFactory ==
that.useConnectionFactory &&
+ timeWindow == that.timeWindow && keepNumberOfEntries ==
that.keepNumberOfEntries &&
+ evictInterval == that.evictInterval && deleteInterval ==
that.deleteInterval &&
+ logDbCallsExceeding == that.logDbCallsExceeding
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val values = Seq(
+ dialect,
+ schema,
+ offsetTable,
+ timestampOffsetTable,
+ managementTable,
+ useConnectionFactory,
+ timeWindow,
+ keepNumberOfEntries,
+ evictInterval,
+ deleteInterval,
+ logDbCallsExceeding
+ )
+ val h = values.foldLeft(MurmurHash3.productSeed) { case (h, value) =>
+ MurmurHash3.mix(h, value.##)
+ }
+ MurmurHash3.finalizeHash(h, values.size)
+ }
+
+ private[this] def copy(
+ dialect: Dialect = dialect,
+ schema: Option[String] = schema,
+ offsetTable: String = offsetTable,
+ timestampOffsetTable: String = timestampOffsetTable,
+ managementTable: String = managementTable,
+ useConnectionFactory: String = useConnectionFactory,
+ timeWindow: JDuration = timeWindow,
+ keepNumberOfEntries: Int = keepNumberOfEntries,
+ evictInterval: JDuration = evictInterval,
+ deleteInterval: JDuration = deleteInterval,
+ logDbCallsExceeding: FiniteDuration = logDbCallsExceeding
+ ): R2dbcProjectionSettings =
+ new R2dbcProjectionSettings(
+ dialect,
+ schema,
+ offsetTable,
+ timestampOffsetTable,
+ managementTable,
+ useConnectionFactory,
+ timeWindow,
+ keepNumberOfEntries,
+ evictInterval,
+ deleteInterval,
+ logDbCallsExceeding
+ )
+
+ def withDialect(dialect: Dialect): R2dbcProjectionSettings =
+ copy(dialect = dialect)
+
+ def withSchema(schema: Option[String]): R2dbcProjectionSettings =
+ copy(schema = schema)
+
+ def withOffsetTable(offsetTable: String): R2dbcProjectionSettings =
+ copy(offsetTable = offsetTable)
+
+ def withTimestampOffsetTable(timestampOffsetTable: String):
R2dbcProjectionSettings =
+ copy(timestampOffsetTable = timestampOffsetTable)
+
+ def withManagementTable(managementTable: String): R2dbcProjectionSettings =
+ copy(managementTable = managementTable)
+
+ def withUseConnectionFactory(useConnectionFactory: String):
R2dbcProjectionSettings =
+ copy(useConnectionFactory = useConnectionFactory)
+
+ def withTimeWindow(timeWindow: JDuration): R2dbcProjectionSettings =
+ copy(timeWindow = timeWindow)
+
+ def withKeepNumberOfEntries(keepNumberOfEntries: Int):
R2dbcProjectionSettings =
+ copy(keepNumberOfEntries = keepNumberOfEntries)
+
+ def withEvictInterval(evictInterval: JDuration): R2dbcProjectionSettings =
+ copy(evictInterval = evictInterval)
+
+ def withDeleteInterval(deleteInterval: JDuration): R2dbcProjectionSettings =
+ copy(deleteInterval = deleteInterval)
+
+ def withLogDbCallsExceeding(logDbCallsExceeding: FiniteDuration):
R2dbcProjectionSettings =
+ copy(logDbCallsExceeding = logDbCallsExceeding)
+
val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
offsetTable
val timestampOffsetTableWithSchema: String = schema.map(_ +
".").getOrElse("") + timestampOffsetTable
val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
managementTable
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 3a0a552..3c612ee 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -24,7 +24,6 @@ import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
@@ -38,8 +37,9 @@ import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
+import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.internal.R2dbcExecutor
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.typed.PersistenceId
import pekko.projection.BySlicesSourceProvider
import pekko.projection.MergeableOffset
@@ -49,6 +49,7 @@ import pekko.projection.internal.OffsetSerialization
import pekko.projection.internal.OffsetSerialization.MultipleOffsets
import pekko.projection.internal.OffsetSerialization.SingleOffset
import pekko.projection.r2dbc.R2dbcProjectionSettings
+import pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore
import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import org.slf4j.LoggerFactory
@@ -154,6 +155,22 @@ object R2dbcOffsetStore {
val FutureDone: Future[Done] = Future.successful(Done)
val FutureTrue: Future[Boolean] = Future.successful(true)
val FutureFalse: Future[Boolean] = Future.successful(false)
+
+ def fromConfig(
+ projectionId: ProjectionId,
+ sourceProvider: Option[BySlicesSourceProvider],
+ system: ActorSystem[_],
+ settings: R2dbcProjectionSettings,
+ r2dbcExecutor: R2dbcExecutor,
+ clock: Clock = Clock.systemUTC()
+ ): R2dbcOffsetStore = {
+ settings.dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ new R2dbcOffsetStore(projectionId, sourceProvider, system, settings,
r2dbcExecutor, clock)
+ case Dialect.MySQL =>
+ new MySQLR2dbcOffsetStore(projectionId, sourceProvider, system,
settings, r2dbcExecutor, clock)
+ }
+ }
}
/**
@@ -170,6 +187,9 @@ private[projection] class R2dbcOffsetStore(
import R2dbcOffsetStore._
+ implicit protected val dialect: Dialect = settings.dialect
+ protected lazy val timestampSql: String = "transaction_timestamp()"
+
// FIXME include projectionId in all log messages
private val logger = LoggerFactory.getLogger(this.getClass)
@@ -181,8 +201,8 @@ private[projection] class R2dbcOffsetStore(
import offsetSerialization.toStorageRepresentation
private val timestampOffsetTable = settings.timestampOffsetTableWithSchema
- private val offsetTable = settings.offsetTableWithSchema
- private val managementTable = settings.managementTableWithSchema
+ protected val offsetTable = settings.offsetTableWithSchema
+ protected val managementTable = settings.managementTableWithSchema
private[projection] implicit val executionContext: ExecutionContext =
system.executionContext
@@ -195,7 +215,7 @@ private[projection] class R2dbcOffsetStore(
private val insertTimestampOffsetSql: String = sql"""
INSERT INTO $timestampOffsetTable
(projection_name, projection_key, slice, persistence_id, seq_nr,
timestamp_offset, timestamp_consumed)
- VALUES (?,?,?,?,?,?, transaction_timestamp())"""
+ VALUES (?,?,?,?,?,?, $timestampSql)"""
// delete less than a timestamp
private val deleteOldTimestampOffsetSql: String =
@@ -211,7 +231,7 @@ private[projection] class R2dbcOffsetStore(
private val selectOffsetSql: String =
sql"SELECT projection_key, current_offset, manifest, mergeable FROM
$offsetTable WHERE projection_name = ?"
- private val upsertOffsetSql: String = sql"""
+ protected val upsertOffsetSql: String = sql"""
INSERT INTO $offsetTable
(projection_name, projection_key, current_offset, manifest, mergeable,
last_updated)
VALUES (?,?,?,?,?,?)
@@ -518,8 +538,10 @@ private[projection] class R2dbcOffsetStore(
} else {
// TODO Try Batch without bind parameters for better performance. Risk
of sql injection for these parameters is low.
val boundStatement =
- records.foldLeft(statement) { (stmt, rec) =>
- stmt.add()
+ records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) =>
+ if (idx != 0) {
+ stmt.add()
+ }
bindRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
@@ -979,11 +1001,7 @@ private[projection] class R2dbcOffsetStore(
.bind(2, paused)
.bind(3, Instant.now(clock).toEpochMilli)
}
- .flatMap {
- case i if i == 1 => Future.successful(Done)
- case _ =>
- Future.failed(new RuntimeException(s"Failed to update management
table for $projectionId"))
- }
+ .map(_ => Done)(ExecutionContexts.parasitic)
}
private def createRecordWithOffset[Envelope](envelope: Envelope):
Option[RecordWithOffset] = {
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 5e41ebd..610b442 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -87,7 +87,7 @@ private[projection] object R2dbcProjectionImpl {
connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) =
{
val r2dbcExecutor =
new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(system.executionContext, system)
- new R2dbcOffsetStore(projectionId, sourceProvider, system, settings,
r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, sourceProvider, system,
settings, r2dbcExecutor)
}
private val loadEnvelopeCounter = new AtomicLong
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
new file mode 100644
index 0000000..fe988c0
--- /dev/null
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.projection.r2dbc.internal.mysql
+
+import java.time.Clock
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.projection.BySlicesSourceProvider
+import pekko.projection.ProjectionId
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import pekko.projection.r2dbc.internal.R2dbcOffsetStore
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[projection] class MySQLR2dbcOffsetStore(
+ projectionId: ProjectionId,
+ sourceProvider: Option[BySlicesSourceProvider],
+ system: ActorSystem[_],
+ settings: R2dbcProjectionSettings,
+ r2dbcExecutor: R2dbcExecutor,
+ clock: Clock = Clock.systemUTC())
+ extends R2dbcOffsetStore(projectionId, sourceProvider, system, settings,
r2dbcExecutor, clock) {
+
+ override lazy val timestampSql: String = "NOW(6)"
+
+ override val upsertOffsetSql: String = sql"""
+ INSERT INTO $offsetTable
+ (projection_name, projection_key, current_offset, manifest, mergeable,
last_updated)
+ VALUES (?,?,?,?,?,?) AS excluded
+ ON DUPLICATE KEY UPDATE
+ current_offset = excluded.current_offset,
+ manifest = excluded.manifest,
+ mergeable = excluded.mergeable,
+ last_updated = excluded.last_updated"""
+
+ override val updateManagementStateSql: String = sql"""
+ INSERT INTO $managementTable
+ (projection_name, projection_key, paused, last_updated)
+ VALUES (?,?,?,?) AS excluded
+ ON DUPLICATE KEY UPDATE
+ paused = excluded.paused,
+ last_updated = excluded.last_updated"""
+}
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
index a05f510..2e57692 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
@@ -201,7 +201,7 @@ class EventSourcedChaosSpec
(1 to expectedEventCounts).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in
case of failure
try {
- processed :+= processedProbe.receiveMessage(15.seconds)
+ processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing =
expectedEvents.diff(processed.map(_.envelope.event))
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index dd9f430..f633fc4 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -19,7 +19,6 @@ import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
@@ -29,8 +28,9 @@ import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
@@ -152,6 +152,7 @@ class EventSourcedEndToEndSpec
// to be able to store events with specific timestamps
private def writeEvent(persistenceId: String, seqNr: Long, timestamp:
Instant, event: String): Unit = {
log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId,
seqNr: java.lang.Long, event, timestamp)
+ implicit val dialect: Dialect = projectionSettings.dialect
val insertEventSql = sql"""
INSERT INTO ${journalSettings.journalTableWithSchema}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer,
adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
@@ -269,7 +270,7 @@ class EventSourcedEndToEndSpec
(1 to numberOfEvents).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in
case of failure
try {
- processed :+= processedProbe.receiveMessage(15.seconds)
+ processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index e3e2e0c..9c88477 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -149,7 +149,7 @@ class EventSourcedPubSubSpec
(1 to numberOfEvents).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case
of failure
try {
- processed :+= processedProbe.receiveMessage(25.seconds)
+ processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index b9af994..b957eb7 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -17,14 +17,14 @@ import java.time.Instant
import java.util.UUID
import scala.concurrent.ExecutionContext
-
import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
import pekko.persistence.query.Sequence
import pekko.persistence.query.TimeBasedUUID
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.projection.MergeableOffset
import pekko.projection.ProjectionId
import pekko.projection.internal.ManagementState
@@ -46,12 +46,13 @@ class R2dbcOffsetStoreSpec
private val settings = R2dbcProjectionSettings(testKit.system)
private def createOffsetStore(projectionId: ProjectionId) =
- new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor,
clock)
+ R2dbcOffsetStore.fromConfig(projectionId, None, system, settings,
r2dbcExecutor, clock)
private val table = settings.offsetTableWithSchema
private implicit val ec: ExecutionContext = system.executionContext
+ implicit val dialect: Dialect = settings.dialect
def selectLastSql: String =
sql"SELECT * FROM $table WHERE projection_name = ? AND projection_key = ?"
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
index 0560568..90c606d 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
@@ -23,7 +23,6 @@ import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@@ -32,8 +31,9 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.projection.HandlerRecoveryStrategy
import pekko.projection.OffsetVerification
import pekko.projection.OffsetVerification.VerificationFailure
@@ -97,18 +97,21 @@ object R2dbcProjectionSpec {
val table = "projection_spec_model"
val createTableSql: String =
- s"""|CREATE table IF NOT EXISTS "$table" (
+ s"""|CREATE table IF NOT EXISTS $table (
| id VARCHAR(255) NOT NULL,
| concatenated VARCHAR(255) NOT NULL,
| PRIMARY KEY(id)
|);""".stripMargin
}
- final case class TestRepository(session: R2dbcSession)(implicit ec:
ExecutionContext, system: ActorSystem[_]) {
+ final case class TestRepository(session: R2dbcSession, settings:
R2dbcProjectionSettings)(
+ implicit ec: ExecutionContext, system: ActorSystem[_]) {
import TestRepository.table
private val logger = LoggerFactory.getLogger(this.getClass)
+ implicit private val dialect: Dialect = settings.dialect
+
def concatToText(id: String, payload: String): Future[Done] = {
val savedStrOpt = findById(id)
@@ -134,14 +137,23 @@ object R2dbcProjectionSpec {
private def upsert(concatStr: ConcatStr): Future[Done] = {
logger.debug("TestRepository.upsert: [{}]", concatStr)
- val stmtSql =
- sql"""
- INSERT INTO "$table" (id, concatenated) VALUES (?, ?)
+ val stmtSql = dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ sql"""
+ INSERT INTO $table (id, concatenated) VALUES (?, ?)
ON CONFLICT (id)
DO UPDATE SET
id = excluded.id,
concatenated = excluded.concatenated
"""
+ case Dialect.MySQL =>
+ sql"""
+ INSERT INTO $table (id, concatenated) VALUES (?, ?) AS excluded
+ ON DUPLICATE KEY UPDATE
+ id = excluded.id,
+ concatenated = excluded.concatenated
+ """
+ }
val stmt = session
.createStatement(stmtSql)
.bind(0, concatStr.id)
@@ -182,7 +194,7 @@ class R2dbcProjectionSpec
private val logger = LoggerFactory.getLogger(getClass)
private val settings = R2dbcProjectionSettings(testKit.system)
private def createOffsetStore(projectionId: ProjectionId): R2dbcOffsetStore =
- new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, None, system, settings,
r2dbcExecutor)
private val projectionTestKit = ProjectionTestKit(system)
override protected def beforeAll(): Unit = {
@@ -245,7 +257,7 @@ class R2dbcProjectionSpec
private def withRepo[R](fun: TestRepository => Future[R]): Future[R] = {
r2dbcExecutor.withConnection("test") { conn =>
val session = new R2dbcSession(conn)
- fun(TestRepository(session))
+ fun(TestRepository(session, settings))
}
}
@@ -260,7 +272,7 @@ class R2dbcProjectionSpec
throw TestException(concatHandlerFail4Msg + s" after $attempts
attempts")
} else {
logger.debug("handling {}", envelope)
- TestRepository(session).concatToText(envelope.id, envelope.message)
+ TestRepository(session, settings).concatToText(envelope.id,
envelope.message)
}
}
}
@@ -441,7 +453,7 @@ class R2dbcProjectionSpec
val bogusEventHandler = new R2dbcHandler[Envelope] {
override def process(session: R2dbcSession, envelope: Envelope):
Future[Done] = {
- val repo = TestRepository(session)
+ val repo = TestRepository(session, settings)
if (envelope.offset == 4L) repo.updateWithNullValue(envelope.id)
else repo.concatToText(envelope.id, envelope.message)
}
@@ -492,7 +504,7 @@ class R2dbcProjectionSpec
verificationProbe.receiveMessage().offset shouldEqual envelope.offset
processProbe.ref ! ProbeMessage("process", envelope.offset)
- TestRepository(session).concatToText(envelope.id, envelope.message)
+ TestRepository(session, settings).concatToText(envelope.id,
envelope.message)
}
}
@@ -588,7 +600,7 @@ class R2dbcProjectionSpec
if (envelopes.isEmpty)
Future.successful(Done)
else {
- val repo = TestRepository(session)
+ val repo = TestRepository(session, settings)
val id = envelopes.head.id
repo.findById(id).flatMap { existing =>
val newConcatStr =
envelopes.foldLeft(existing.getOrElse(ConcatStr(id, ""))) { (acc, env) =>
@@ -925,7 +937,7 @@ class R2dbcProjectionSpec
handler = () =>
R2dbcHandler[Envelope] { (session, envelope) =>
verifiedProbe.expectMessage(envelope.offset)
- TestRepository(session).concatToText(envelope.id,
envelope.message)
+ TestRepository(session, settings).concatToText(envelope.id,
envelope.message)
})
projectionTestKit.run(projection) {
@@ -1292,7 +1304,7 @@ class R2dbcProjectionSpec
sourceProvider = sourceProvider(entityId),
handler = () =>
R2dbcHandler[Envelope] { (session, envelope) =>
- TestRepository(session).concatToText(envelope.id,
envelope.message)
+ TestRepository(session, settings).concatToText(envelope.id,
envelope.message)
})
offsetShouldBeEmpty()
@@ -1324,7 +1336,7 @@ class R2dbcProjectionSpec
sourceProvider = sourceProvider(entityId),
handler = () =>
R2dbcHandler[Envelope] { (session, envelope) =>
- TestRepository(session).concatToText(envelope.id,
envelope.message)
+ TestRepository(session, settings).concatToText(envelope.id,
envelope.message)
})
offsetShouldBeEmpty()
@@ -1357,7 +1369,7 @@ class R2dbcProjectionSpec
sourceProvider = sourceProvider(entityId),
handler = () =>
R2dbcHandler[Envelope] { (session, envelope) =>
- TestRepository(session).concatToText(envelope.id,
envelope.message)
+ TestRepository(session, settings).concatToText(envelope.id,
envelope.message)
})
offsetShouldBeEmpty()
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 3685242..f24b88f 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.projection.r2dbc
import java.time.Instant
+import java.time.temporal.ChronoUnit
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
@@ -25,7 +26,6 @@ import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@@ -154,7 +154,7 @@ class R2dbcTimestampOffsetProjectionSpec
private def createOffsetStore(
projectionId: ProjectionId,
sourceProvider: TestTimestampSourceProvider): R2dbcOffsetStore =
- new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings,
r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
private val projectionTestKit = ProjectionTestKit(system)
override protected def beforeAll(): Unit = {
@@ -228,7 +228,7 @@ class R2dbcTimestampOffsetProjectionSpec
private def withRepo[R](fun: TestRepository => Future[R]): Future[R] = {
r2dbcExecutor.withConnection("test") { conn =>
val session = new R2dbcSession(conn)
- fun(TestRepository(session))
+ fun(TestRepository(session, settings))
}
}
@@ -245,7 +245,7 @@ class R2dbcTimestampOffsetProjectionSpec
throw TestException(concatHandlerFail4Msg + s" after $attempts
attempts")
} else {
logger.debug(s"handling {}", envelope)
- TestRepository(session).concatToText(envelope.persistenceId,
envelope.event)
+ TestRepository(session, settings).concatToText(envelope.persistenceId,
envelope.event)
}
}
@@ -287,8 +287,13 @@ class R2dbcTimestampOffsetProjectionSpec
}
}
+ def now(): Instant = {
+ // supported databases do not store more than 6 fractional digits
+ Instant.now().truncatedTo(ChronoUnit.MICROS)
+ }
+
def createEnvelopesWithDuplicates(pid1: Pid, pid2: Pid):
Vector[EventEnvelope[String]] = {
- val startTime = Instant.now()
+ val startTime = now()
Vector(
createEnvelope(pid1, 1, startTime, "e1-1"),
@@ -342,7 +347,7 @@ class R2dbcTimestampOffsetProjectionSpec
if (envelopes.isEmpty)
Future.successful(Done)
else {
- val repo = TestRepository(session)
+ val repo = TestRepository(session, settings)
val results = envelopes.groupBy(_.persistenceId).map { case (pid,
envs) =>
repo.findById(pid).flatMap { existing =>
val newConcatStr = envs.foldLeft(existing.getOrElse(ConcatStr(pid,
""))) { (acc, env) =>
@@ -552,7 +557,7 @@ class R2dbcTimestampOffsetProjectionSpec
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = now()
val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1,
pid2)
val sourceProvider1 = createSourceProvider(envelopes1)
implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider1)
@@ -664,7 +669,7 @@ class R2dbcTimestampOffsetProjectionSpec
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = now()
val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1,
pid2)
val sourceProvider1 = createBacktrackingSourceProvider(envelopes1)
implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider1)
@@ -813,10 +818,10 @@ class R2dbcTimestampOffsetProjectionSpec
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = now()
val sourceProvider = new TestSourceProviderWithInput()
implicit val offsetStore: R2dbcOffsetStore =
- new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider),
system, settings, r2dbcExecutor)
val result1 = new StringBuffer()
val result2 = new StringBuffer()
@@ -950,10 +955,10 @@ class R2dbcTimestampOffsetProjectionSpec
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = now()
val sourceProvider = new TestSourceProviderWithInput()
implicit val offsetStore: R2dbcOffsetStore =
- new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider),
system, settings, r2dbcExecutor)
val projectionRef = spawn(
ProjectionBehavior(
@@ -1154,10 +1159,10 @@ class R2dbcTimestampOffsetProjectionSpec
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = now()
val sourceProvider = new TestSourceProviderWithInput()
implicit val offsetStore: R2dbcOffsetStore =
- new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider),
system, settings, r2dbcExecutor)
val result1 = new StringBuffer()
val result2 = new StringBuffer()
@@ -1296,10 +1301,10 @@ class R2dbcTimestampOffsetProjectionSpec
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- val startTime = Instant.now()
+ val startTime = now()
val sourceProvider = new TestSourceProviderWithInput()
implicit val offsetStore: R2dbcOffsetStore =
- new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
+ R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider),
system, settings, r2dbcExecutor)
val flowHandler =
FlowWithContext[EventEnvelope[String], ProjectionContext]
@@ -1393,7 +1398,7 @@ class R2dbcTimestampOffsetProjectionSpec
sourceProvider,
handler = () =>
R2dbcHandler[EventEnvelope[String]] { (session, envelope) =>
- TestRepository(session).concatToText(envelope.persistenceId,
envelope.event)
+ TestRepository(session,
settings).concatToText(envelope.persistenceId, envelope.event)
})
offsetShouldBeEmpty()
@@ -1427,7 +1432,7 @@ class R2dbcTimestampOffsetProjectionSpec
sourceProvider,
handler = () =>
R2dbcHandler[EventEnvelope[String]] { (session, envelope) =>
- TestRepository(session).concatToText(envelope.persistenceId,
envelope.event)
+ TestRepository(session,
settings).concatToText(envelope.persistenceId, envelope.event)
})
offsetShouldBeEmpty()
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index f1e91cf..0708d74 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -81,7 +81,7 @@ class R2dbcTimestampOffsetStoreSpec
projectionId: ProjectionId,
customSettings: R2dbcProjectionSettings = settings,
eventTimestampQueryClock: TestClock = clock) =
- new R2dbcOffsetStore(
+ R2dbcOffsetStore.fromConfig(
projectionId,
Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices -
1, eventTimestampQueryClock)),
system,
@@ -281,7 +281,7 @@ class R2dbcTimestampOffsetStoreSpec
slice4 shouldBe 656
val offsetStore0 =
- new R2dbcOffsetStore(
+ R2dbcOffsetStore.fromConfig(
projectionId0,
Some(new TestTimestampSourceProvider(0,
persistenceExt.numberOfSlices - 1, clock)),
system,
@@ -302,7 +302,7 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore0.saveOffset(offset4).futureValue
val offsetStore1 =
- new R2dbcOffsetStore(
+ R2dbcOffsetStore.fromConfig(
projectionId1,
Some(new TestTimestampSourceProvider(0, 511, clock)),
system,
@@ -312,7 +312,7 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore1.getState().byPid.keySet shouldBe Set(p1, p2)
val offsetStore2 =
- new R2dbcOffsetStore(
+ R2dbcOffsetStore.fromConfig(
projectionId2,
Some(new TestTimestampSourceProvider(512, 1023, clock)),
system,
@@ -561,7 +561,7 @@ class R2dbcTimestampOffsetStoreSpec
"evict old records" in {
val projectionId = genRandomProjectionId()
- val evictSettings = settings.copy(timeWindow = JDuration.ofSeconds(100),
evictInterval = JDuration.ofSeconds(10))
+ val evictSettings =
settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10))
import evictSettings._
val offsetStore = createOffsetStore(projectionId, evictSettings)
@@ -604,7 +604,7 @@ class R2dbcTimestampOffsetStoreSpec
"delete old records" in {
val projectionId = genRandomProjectionId()
- val deleteSettings = settings.copy(timeWindow = JDuration.ofSeconds(100))
+ val deleteSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
import deleteSettings._
val offsetStore = createOffsetStore(projectionId, deleteSettings)
@@ -635,8 +635,7 @@ class R2dbcTimestampOffsetStoreSpec
"periodically delete old records" in {
val projectionId = genRandomProjectionId()
- val deleteSettings =
- settings.copy(timeWindow = JDuration.ofSeconds(100), deleteInterval =
JDuration.ofMillis(500))
+ val deleteSettings =
settings.withTimeWindow(JDuration.ofSeconds(100)).withDeleteInterval(JDuration.ofMillis(500))
import deleteSettings._
val offsetStore = createOffsetStore(projectionId, deleteSettings)
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
index 13b8771..fc48aff 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
@@ -44,6 +44,21 @@ object TestConfig {
database = "yugabyte"
}
""")
+ case "mysql" =>
+ ConfigFactory.parseString("""
+ pekko.persistence.r2dbc{
+ connection-factory {
+ driver = "mysql"
+ host = "localhost"
+ port = 3306
+ user = "root"
+ password = "root"
+ database = "mysql"
+ }
+ db-timestamp-monotonic-increasing = on
+ use-app-timestamp = on
+ }
+ """)
}
// using load here so that connection-factory can be overridden
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]