This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new beee0d1  Projection implementation for MySQL support (#177)
beee0d1 is described below

commit beee0d1bfd0cd46969e8ab940cf8244e7f07b5b3
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Sat Nov 30 13:06:42 2024 +0200

    Projection implementation for MySQL support (#177)
    
    * Projection implementation for MySQL support
    
    * Convert R2dbcProjectionSettings from case class to class
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
---
 .github/workflows/build-test.yml                   |   2 +-
 .../pekko/persistence/r2dbc/R2dbcSettings.scala    |  20 ++-
 .../pekko/persistence/r2dbc/internal/Sql.scala     |   2 +-
 project/Dependencies.scala                         |   1 +
 .../r2dbcprojectionsettings.excludes               |  51 +++++++
 projection/src/main/resources/reference.conf       |   2 +-
 .../projection/r2dbc/R2dbcProjectionSettings.scala | 160 ++++++++++++++++++---
 .../r2dbc/internal/R2dbcOffsetStore.scala          |  44 ++++--
 .../r2dbc/internal/R2dbcProjectionImpl.scala       |   2 +-
 .../internal/mysql/MySQLR2dbcOffsetStore.scala     |  66 +++++++++
 .../projection/r2dbc/EventSourcedChaosSpec.scala   |   2 +-
 .../r2dbc/EventSourcedEndToEndSpec.scala           |   7 +-
 .../projection/r2dbc/EventSourcedPubSubSpec.scala  |   2 +-
 .../projection/r2dbc/R2dbcOffsetStoreSpec.scala    |   7 +-
 .../projection/r2dbc/R2dbcProjectionSpec.scala     |  46 +++---
 .../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala |  41 +++---
 .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala      |  15 +-
 .../apache/pekko/projection/r2dbc/TestConfig.scala |  15 ++
 18 files changed, 393 insertions(+), 92 deletions(-)

diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 6e7d7cd..80f310f 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -185,7 +185,7 @@ jobs:
           docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
 
       - name: test
-        run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ 
matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 
'test' }}
+        run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ 
matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
 
   test-docs:
     name: Docs
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index dd8856f..5cbfbff 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -53,13 +53,7 @@ final class R2dbcSettings(config: Config) {
 
   val durableStateAssertSingleWriter: Boolean = 
config.getBoolean("state.assert-single-writer")
 
-  val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
-    case "yugabyte" => Dialect.Yugabyte
-    case "postgres" => Dialect.Postgres
-    case "mysql"    => Dialect.MySQL
-    case other =>
-      throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported 
dialects are [yugabyte, postgres].")
-  }
+  val dialect: Dialect = Dialect.fromString(config.getString("dialect"))
 
   val querySettings = new QuerySettings(config.getConfig("query"))
 
@@ -96,6 +90,18 @@ object Dialect {
 
   /** @since 1.1.0 */
   case object MySQL extends Dialect
+
+  /** @since 1.1.0 */
+  def fromString(value: String): Dialect = {
+    toRootLowerCase(value) match {
+      case "yugabyte" => Dialect.Yugabyte
+      case "postgres" => Dialect.Postgres
+      case "mysql"    => Dialect.MySQL
+      case other =>
+        throw new IllegalArgumentException(
+          s"Unknown dialect [$other]. Supported dialects are [yugabyte, 
postgres, mysql].")
+    }
+  }
 }
 
 /**
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
index affe56f..6fc0649 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
@@ -54,7 +54,7 @@ object Sql {
    * INTERNAL API
    */
   @InternalApi
-  private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) 
extends AnyVal {
+  private[pekko] implicit class DialectInterpolation(val sc: StringContext) 
extends AnyVal {
     def sql(args: Any*)(implicit dialect: Dialect): String =
       dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
   }
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index b555938..6eebdc7 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -89,6 +89,7 @@ object Dependencies {
     r2dbcSpi,
     r2dbcPool,
     r2dbcPostgres % "provided,test",
+    r2dbcMysql % "provided,test",
     pekkoProjectionCore,
     TestDeps.pekkoProjectionEventSourced,
     TestDeps.pekkoProjectionDurableState,
diff --git 
a/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes
 
b/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes
new file mode 100644
index 0000000..8b28128
--- /dev/null
+++ 
b/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Converting case class to class
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.canEqual")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productArity")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productPrefix")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElement")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementName")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementNames")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productIterator")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$3")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$4")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$5")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$6")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$7")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$8")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$9")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$10")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._3")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._4")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._5")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._6")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._7")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._8")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._9")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._10")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.fromProduct")
diff --git a/projection/src/main/resources/reference.conf 
b/projection/src/main/resources/reference.conf
index cf8a83b..67ffe1d 100644
--- a/projection/src/main/resources/reference.conf
+++ b/projection/src/main/resources/reference.conf
@@ -5,7 +5,7 @@
 
 //#projection-config
 pekko.projection.r2dbc {
-  # postgres or yugabyte
+  # postgres, yugabyte or mysql
   dialect = ${pekko.persistence.r2dbc.dialect}
 
   offset-store {
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
index a2423be..70bc11b 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala
@@ -17,11 +17,12 @@ import java.time.{ Duration => JDuration }
 import java.util.Locale
 
 import scala.concurrent.duration._
-
+import scala.util.hashing.MurmurHash3
+import com.typesafe.config.Config
 import org.apache.pekko
-import pekko.util.JavaDurationConverters._
 import pekko.actor.typed.ActorSystem
-import com.typesafe.config.Config
+import pekko.persistence.r2dbc.Dialect
+import pekko.util.JavaDurationConverters._
 
 object R2dbcProjectionSettings {
 
@@ -34,7 +35,8 @@ object R2dbcProjectionSettings {
         case _     => config.getDuration("log-db-calls-exceeding").asScala
       }
 
-    R2dbcProjectionSettings(
+    new R2dbcProjectionSettings(
+      dialect = Dialect.fromString(config.getString("dialect")),
       schema = 
Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty),
       offsetTable = config.getString("offset-store.offset-table"),
       timestampOffsetTable = 
config.getString("offset-store.timestamp-offset-table"),
@@ -44,25 +46,149 @@ object R2dbcProjectionSettings {
       keepNumberOfEntries = 
config.getInt("offset-store.keep-number-of-entries"),
       evictInterval = config.getDuration("offset-store.evict-interval"),
       deleteInterval = config.getDuration("offset-store.delete-interval"),
-      logDbCallsExceeding)
+      logDbCallsExceeding
+    )
   }
 
   def apply(system: ActorSystem[_]): R2dbcProjectionSettings =
     apply(system.settings.config.getConfig(DefaultConfigPath))
+
+  def apply(
+      schema: Option[String],
+      offsetTable: String,
+      timestampOffsetTable: String,
+      managementTable: String,
+      useConnectionFactory: String,
+      timeWindow: JDuration,
+      keepNumberOfEntries: Int,
+      evictInterval: JDuration,
+      deleteInterval: JDuration,
+      logDbCallsExceeding: FiniteDuration
+  ): R2dbcProjectionSettings = new R2dbcProjectionSettings(
+    Dialect.Postgres,
+    schema,
+    offsetTable,
+    timestampOffsetTable,
+    managementTable,
+    useConnectionFactory,
+    timeWindow,
+    keepNumberOfEntries,
+    evictInterval,
+    deleteInterval,
+    logDbCallsExceeding
+  )
 }
 
-// FIXME remove case class, and add `with` methods
-final case class R2dbcProjectionSettings(
-    schema: Option[String],
-    offsetTable: String,
-    timestampOffsetTable: String,
-    managementTable: String,
-    useConnectionFactory: String,
-    timeWindow: JDuration,
-    keepNumberOfEntries: Int,
-    evictInterval: JDuration,
-    deleteInterval: JDuration,
-    logDbCallsExceeding: FiniteDuration) {
+final class R2dbcProjectionSettings private (
+    val dialect: Dialect,
+    val schema: Option[String],
+    val offsetTable: String,
+    val timestampOffsetTable: String,
+    val managementTable: String,
+    val useConnectionFactory: String,
+    val timeWindow: JDuration,
+    val keepNumberOfEntries: Int,
+    val evictInterval: JDuration,
+    val deleteInterval: JDuration,
+    val logDbCallsExceeding: FiniteDuration
+) extends Serializable {
+
+  override def toString: String =
+    s"R2dbcProjectionSettings($dialect, $schema, $offsetTable, 
$timestampOffsetTable, $managementTable, " +
+    s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries, 
$evictInterval, $deleteInterval, $logDbCallsExceeding)"
+
+  override def equals(other: Any): Boolean =
+    other match {
+      case that: R2dbcProjectionSettings =>
+        dialect == that.dialect && schema == that.schema &&
+        offsetTable == that.offsetTable && timestampOffsetTable == 
that.timestampOffsetTable &&
+        managementTable == that.managementTable && useConnectionFactory == 
that.useConnectionFactory &&
+        timeWindow == that.timeWindow && keepNumberOfEntries == 
that.keepNumberOfEntries &&
+        evictInterval == that.evictInterval && deleteInterval == 
that.deleteInterval &&
+        logDbCallsExceeding == that.logDbCallsExceeding
+      case _ => false
+    }
+
+  override def hashCode(): Int = {
+    val values = Seq(
+      dialect,
+      schema,
+      offsetTable,
+      timestampOffsetTable,
+      managementTable,
+      useConnectionFactory,
+      timeWindow,
+      keepNumberOfEntries,
+      evictInterval,
+      deleteInterval,
+      logDbCallsExceeding
+    )
+    val h = values.foldLeft(MurmurHash3.productSeed) { case (h, value) =>
+      MurmurHash3.mix(h, value.##)
+    }
+    MurmurHash3.finalizeHash(h, values.size)
+  }
+
+  private[this] def copy(
+      dialect: Dialect = dialect,
+      schema: Option[String] = schema,
+      offsetTable: String = offsetTable,
+      timestampOffsetTable: String = timestampOffsetTable,
+      managementTable: String = managementTable,
+      useConnectionFactory: String = useConnectionFactory,
+      timeWindow: JDuration = timeWindow,
+      keepNumberOfEntries: Int = keepNumberOfEntries,
+      evictInterval: JDuration = evictInterval,
+      deleteInterval: JDuration = deleteInterval,
+      logDbCallsExceeding: FiniteDuration = logDbCallsExceeding
+  ): R2dbcProjectionSettings =
+    new R2dbcProjectionSettings(
+      dialect,
+      schema,
+      offsetTable,
+      timestampOffsetTable,
+      managementTable,
+      useConnectionFactory,
+      timeWindow,
+      keepNumberOfEntries,
+      evictInterval,
+      deleteInterval,
+      logDbCallsExceeding
+    )
+
+  def withDialect(dialect: Dialect): R2dbcProjectionSettings =
+    copy(dialect = dialect)
+
+  def withSchema(schema: Option[String]): R2dbcProjectionSettings =
+    copy(schema = schema)
+
+  def withOffsetTable(offsetTable: String): R2dbcProjectionSettings =
+    copy(offsetTable = offsetTable)
+
+  def withTimestampOffsetTable(timestampOffsetTable: String): 
R2dbcProjectionSettings =
+    copy(timestampOffsetTable = timestampOffsetTable)
+
+  def withManagementTable(managementTable: String): R2dbcProjectionSettings =
+    copy(managementTable = managementTable)
+
+  def withUseConnectionFactory(useConnectionFactory: String): 
R2dbcProjectionSettings =
+    copy(useConnectionFactory = useConnectionFactory)
+
+  def withTimeWindow(timeWindow: JDuration): R2dbcProjectionSettings =
+    copy(timeWindow = timeWindow)
+
+  def withKeepNumberOfEntries(keepNumberOfEntries: Int): 
R2dbcProjectionSettings =
+    copy(keepNumberOfEntries = keepNumberOfEntries)
+
+  def withEvictInterval(evictInterval: JDuration): R2dbcProjectionSettings =
+    copy(evictInterval = evictInterval)
+
+  def withDeleteInterval(deleteInterval: JDuration): R2dbcProjectionSettings =
+    copy(deleteInterval = deleteInterval)
+
+  def withLogDbCallsExceeding(logDbCallsExceeding: FiniteDuration): 
R2dbcProjectionSettings =
+    copy(logDbCallsExceeding = logDbCallsExceeding)
+
   val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
offsetTable
   val timestampOffsetTableWithSchema: String = schema.map(_ + 
".").getOrElse("") + timestampOffsetTable
   val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
managementTable
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 3a0a552..3c612ee 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -24,7 +24,6 @@ import scala.annotation.tailrec
 import scala.collection.immutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.typed.ActorSystem
@@ -38,8 +37,9 @@ import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.UpdatedDurableState
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
+import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.typed.PersistenceId
 import pekko.projection.BySlicesSourceProvider
 import pekko.projection.MergeableOffset
@@ -49,6 +49,7 @@ import pekko.projection.internal.OffsetSerialization
 import pekko.projection.internal.OffsetSerialization.MultipleOffsets
 import pekko.projection.internal.OffsetSerialization.SingleOffset
 import pekko.projection.r2dbc.R2dbcProjectionSettings
+import pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore
 import io.r2dbc.spi.Connection
 import io.r2dbc.spi.Statement
 import org.slf4j.LoggerFactory
@@ -154,6 +155,22 @@ object R2dbcOffsetStore {
   val FutureDone: Future[Done] = Future.successful(Done)
   val FutureTrue: Future[Boolean] = Future.successful(true)
   val FutureFalse: Future[Boolean] = Future.successful(false)
+
+  def fromConfig(
+      projectionId: ProjectionId,
+      sourceProvider: Option[BySlicesSourceProvider],
+      system: ActorSystem[_],
+      settings: R2dbcProjectionSettings,
+      r2dbcExecutor: R2dbcExecutor,
+      clock: Clock = Clock.systemUTC()
+  ): R2dbcOffsetStore = {
+    settings.dialect match {
+      case Dialect.Postgres | Dialect.Yugabyte =>
+        new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, 
r2dbcExecutor, clock)
+      case Dialect.MySQL =>
+        new MySQLR2dbcOffsetStore(projectionId, sourceProvider, system, 
settings, r2dbcExecutor, clock)
+    }
+  }
 }
 
 /**
@@ -170,6 +187,9 @@ private[projection] class R2dbcOffsetStore(
 
   import R2dbcOffsetStore._
 
+  implicit protected val dialect: Dialect = settings.dialect
+  protected lazy val timestampSql: String = "transaction_timestamp()"
+
   // FIXME include projectionId in all log messages
   private val logger = LoggerFactory.getLogger(this.getClass)
 
@@ -181,8 +201,8 @@ private[projection] class R2dbcOffsetStore(
   import offsetSerialization.toStorageRepresentation
 
   private val timestampOffsetTable = settings.timestampOffsetTableWithSchema
-  private val offsetTable = settings.offsetTableWithSchema
-  private val managementTable = settings.managementTableWithSchema
+  protected val offsetTable = settings.offsetTableWithSchema
+  protected val managementTable = settings.managementTableWithSchema
 
   private[projection] implicit val executionContext: ExecutionContext = 
system.executionContext
 
@@ -195,7 +215,7 @@ private[projection] class R2dbcOffsetStore(
   private val insertTimestampOffsetSql: String = sql"""
     INSERT INTO $timestampOffsetTable
     (projection_name, projection_key, slice, persistence_id, seq_nr, 
timestamp_offset, timestamp_consumed)
-    VALUES (?,?,?,?,?,?, transaction_timestamp())"""
+    VALUES (?,?,?,?,?,?, $timestampSql)"""
 
   // delete less than a timestamp
   private val deleteOldTimestampOffsetSql: String =
@@ -211,7 +231,7 @@ private[projection] class R2dbcOffsetStore(
   private val selectOffsetSql: String =
     sql"SELECT projection_key, current_offset, manifest, mergeable FROM 
$offsetTable WHERE projection_name = ?"
 
-  private val upsertOffsetSql: String = sql"""
+  protected val upsertOffsetSql: String = sql"""
     INSERT INTO $offsetTable
     (projection_name, projection_key, current_offset, manifest, mergeable, 
last_updated)
     VALUES (?,?,?,?,?,?)
@@ -518,8 +538,10 @@ private[projection] class R2dbcOffsetStore(
     } else {
       // TODO Try Batch without bind parameters for better performance. Risk 
of sql injection for these parameters is low.
       val boundStatement =
-        records.foldLeft(statement) { (stmt, rec) =>
-          stmt.add()
+        records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) =>
+          if (idx != 0) {
+            stmt.add()
+          }
           bindRecord(stmt, rec)
         }
       R2dbcExecutor.updateBatchInTx(boundStatement)
@@ -979,11 +1001,7 @@ private[projection] class R2dbcOffsetStore(
           .bind(2, paused)
           .bind(3, Instant.now(clock).toEpochMilli)
       }
-      .flatMap {
-        case i if i == 1 => Future.successful(Done)
-        case _ =>
-          Future.failed(new RuntimeException(s"Failed to update management 
table for $projectionId"))
-      }
+      .map(_ => Done)(ExecutionContexts.parasitic)
   }
 
   private def createRecordWithOffset[Envelope](envelope: Envelope): 
Option[RecordWithOffset] = {
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 5e41ebd..610b442 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -87,7 +87,7 @@ private[projection] object R2dbcProjectionImpl {
       connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) = 
{
     val r2dbcExecutor =
       new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(system.executionContext, system)
-    new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, 
r2dbcExecutor)
+    R2dbcOffsetStore.fromConfig(projectionId, sourceProvider, system, 
settings, r2dbcExecutor)
   }
 
   private val loadEnvelopeCounter = new AtomicLong
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
new file mode 100644
index 0000000..fe988c0
--- /dev/null
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.projection.r2dbc.internal.mysql
+
+import java.time.Clock
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.projection.BySlicesSourceProvider
+import pekko.projection.ProjectionId
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import pekko.projection.r2dbc.internal.R2dbcOffsetStore
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[projection] class MySQLR2dbcOffsetStore(
+    projectionId: ProjectionId,
+    sourceProvider: Option[BySlicesSourceProvider],
+    system: ActorSystem[_],
+    settings: R2dbcProjectionSettings,
+    r2dbcExecutor: R2dbcExecutor,
+    clock: Clock = Clock.systemUTC())
+    extends R2dbcOffsetStore(projectionId, sourceProvider, system, settings, 
r2dbcExecutor, clock) {
+
+  override lazy val timestampSql: String = "NOW(6)"
+
+  override val upsertOffsetSql: String = sql"""
+    INSERT INTO $offsetTable
+    (projection_name, projection_key, current_offset, manifest, mergeable, 
last_updated)
+    VALUES (?,?,?,?,?,?) AS excluded
+    ON DUPLICATE KEY UPDATE
+    current_offset = excluded.current_offset,
+    manifest = excluded.manifest,
+    mergeable = excluded.mergeable,
+    last_updated = excluded.last_updated"""
+
+  override val updateManagementStateSql: String = sql"""
+    INSERT INTO $managementTable
+    (projection_name, projection_key, paused, last_updated)
+    VALUES (?,?,?,?) AS excluded
+    ON DUPLICATE KEY UPDATE
+    paused = excluded.paused,
+    last_updated = excluded.last_updated"""
+}
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
index a05f510..2e57692 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala
@@ -201,7 +201,7 @@ class EventSourcedChaosSpec
         (1 to expectedEventCounts).foreach { _ =>
           // not using receiveMessages(expectedEvents) for better logging in 
case of failure
           try {
-            processed :+= processedProbe.receiveMessage(15.seconds)
+            processed :+= processedProbe.receiveMessage(30.seconds)
           } catch {
             case e: AssertionError =>
               val missing = 
expectedEvents.diff(processed.map(_.envelope.event))
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index dd9f430..f633fc4 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -19,7 +19,6 @@ import java.util.UUID
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
@@ -29,8 +28,9 @@ import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.Behavior
 import pekko.actor.typed.scaladsl.Behaviors
 import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.scaladsl.Effect
@@ -152,6 +152,7 @@ class EventSourcedEndToEndSpec
   // to be able to store events with specific timestamps
   private def writeEvent(persistenceId: String, seqNr: Long, timestamp: 
Instant, event: String): Unit = {
     log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, 
seqNr: java.lang.Long, event, timestamp)
+    implicit val dialect: Dialect = projectionSettings.dialect
     val insertEventSql = sql"""
       INSERT INTO ${journalSettings.journalTableWithSchema}
       (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, 
adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
@@ -269,7 +270,7 @@ class EventSourcedEndToEndSpec
       (1 to numberOfEvents).foreach { _ =>
         // not using receiveMessages(expectedEvents) for better logging in 
case of failure
         try {
-          processed :+= processedProbe.receiveMessage(15.seconds)
+          processed :+= processedProbe.receiveMessage(30.seconds)
         } catch {
           case e: AssertionError =>
             val missing = expectedEvents.diff(processed.map(_.envelope.event))
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index e3e2e0c..9c88477 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -149,7 +149,7 @@ class EventSourcedPubSubSpec
     (1 to numberOfEvents).foreach { _ =>
       // not using receiveMessages(expectedEvents) for better logging in case 
of failure
       try {
-        processed :+= processedProbe.receiveMessage(25.seconds)
+        processed :+= processedProbe.receiveMessage(30.seconds)
       } catch {
         case e: AssertionError =>
           val missing = expectedEvents.diff(processed.map(_.envelope.event))
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index b9af994..b957eb7 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -17,14 +17,14 @@ import java.time.Instant
 import java.util.UUID
 
 import scala.concurrent.ExecutionContext
-
 import org.apache.pekko
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
 import pekko.persistence.query.Sequence
 import pekko.persistence.query.TimeBasedUUID
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.projection.MergeableOffset
 import pekko.projection.ProjectionId
 import pekko.projection.internal.ManagementState
@@ -46,12 +46,13 @@ class R2dbcOffsetStoreSpec
   private val settings = R2dbcProjectionSettings(testKit.system)
 
   private def createOffsetStore(projectionId: ProjectionId) =
-    new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor, 
clock)
+    R2dbcOffsetStore.fromConfig(projectionId, None, system, settings, 
r2dbcExecutor, clock)
 
   private val table = settings.offsetTableWithSchema
 
   private implicit val ec: ExecutionContext = system.executionContext
 
+  implicit val dialect: Dialect = settings.dialect
   def selectLastSql: String =
     sql"SELECT * FROM $table WHERE projection_name = ? AND projection_key = ?"
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
index 0560568..90c606d 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
@@ -23,7 +23,6 @@ import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
@@ -32,8 +31,9 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorRef
 import pekko.actor.typed.ActorSystem
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.projection.HandlerRecoveryStrategy
 import pekko.projection.OffsetVerification
 import pekko.projection.OffsetVerification.VerificationFailure
@@ -97,18 +97,21 @@ object R2dbcProjectionSpec {
     val table = "projection_spec_model"
 
     val createTableSql: String =
-      s"""|CREATE table IF NOT EXISTS "$table" (
+      s"""|CREATE table IF NOT EXISTS $table (
           |  id VARCHAR(255) NOT NULL,
           |  concatenated VARCHAR(255) NOT NULL,
           |  PRIMARY KEY(id)
           |);""".stripMargin
   }
 
-  final case class TestRepository(session: R2dbcSession)(implicit ec: 
ExecutionContext, system: ActorSystem[_]) {
+  final case class TestRepository(session: R2dbcSession, settings: 
R2dbcProjectionSettings)(
+      implicit ec: ExecutionContext, system: ActorSystem[_]) {
     import TestRepository.table
 
     private val logger = LoggerFactory.getLogger(this.getClass)
 
+    implicit private val dialect: Dialect = settings.dialect
+
     def concatToText(id: String, payload: String): Future[Done] = {
       val savedStrOpt = findById(id)
 
@@ -134,14 +137,23 @@ object R2dbcProjectionSpec {
     private def upsert(concatStr: ConcatStr): Future[Done] = {
       logger.debug("TestRepository.upsert: [{}]", concatStr)
 
-      val stmtSql =
-        sql"""
-          INSERT INTO "$table" (id, concatenated)  VALUES (?, ?)
+      val stmtSql = dialect match {
+        case Dialect.Postgres | Dialect.Yugabyte =>
+          sql"""
+          INSERT INTO $table (id, concatenated)  VALUES (?, ?)
           ON CONFLICT (id)
           DO UPDATE SET
             id = excluded.id,
             concatenated = excluded.concatenated
          """
+        case Dialect.MySQL =>
+          sql"""
+          INSERT INTO $table (id, concatenated)  VALUES (?, ?) AS excluded
+          ON DUPLICATE KEY UPDATE
+            id = excluded.id,
+            concatenated = excluded.concatenated
+         """
+      }
       val stmt = session
         .createStatement(stmtSql)
         .bind(0, concatStr.id)
@@ -182,7 +194,7 @@ class R2dbcProjectionSpec
   private val logger = LoggerFactory.getLogger(getClass)
   private val settings = R2dbcProjectionSettings(testKit.system)
   private def createOffsetStore(projectionId: ProjectionId): R2dbcOffsetStore =
-    new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor)
+    R2dbcOffsetStore.fromConfig(projectionId, None, system, settings, 
r2dbcExecutor)
   private val projectionTestKit = ProjectionTestKit(system)
 
   override protected def beforeAll(): Unit = {
@@ -245,7 +257,7 @@ class R2dbcProjectionSpec
   private def withRepo[R](fun: TestRepository => Future[R]): Future[R] = {
     r2dbcExecutor.withConnection("test") { conn =>
       val session = new R2dbcSession(conn)
-      fun(TestRepository(session))
+      fun(TestRepository(session, settings))
     }
   }
 
@@ -260,7 +272,7 @@ class R2dbcProjectionSpec
         throw TestException(concatHandlerFail4Msg + s" after $attempts 
attempts")
       } else {
         logger.debug("handling {}", envelope)
-        TestRepository(session).concatToText(envelope.id, envelope.message)
+        TestRepository(session, settings).concatToText(envelope.id, 
envelope.message)
       }
     }
   }
@@ -441,7 +453,7 @@ class R2dbcProjectionSpec
 
       val bogusEventHandler = new R2dbcHandler[Envelope] {
         override def process(session: R2dbcSession, envelope: Envelope): 
Future[Done] = {
-          val repo = TestRepository(session)
+          val repo = TestRepository(session, settings)
           if (envelope.offset == 4L) repo.updateWithNullValue(envelope.id)
           else repo.concatToText(envelope.id, envelope.message)
         }
@@ -492,7 +504,7 @@ class R2dbcProjectionSpec
           verificationProbe.receiveMessage().offset shouldEqual envelope.offset
           processProbe.ref ! ProbeMessage("process", envelope.offset)
 
-          TestRepository(session).concatToText(envelope.id, envelope.message)
+          TestRepository(session, settings).concatToText(envelope.id, 
envelope.message)
         }
       }
 
@@ -588,7 +600,7 @@ class R2dbcProjectionSpec
                 if (envelopes.isEmpty)
                   Future.successful(Done)
                 else {
-                  val repo = TestRepository(session)
+                  val repo = TestRepository(session, settings)
                   val id = envelopes.head.id
                   repo.findById(id).flatMap { existing =>
                     val newConcatStr = 
envelopes.foldLeft(existing.getOrElse(ConcatStr(id, ""))) { (acc, env) =>
@@ -925,7 +937,7 @@ class R2dbcProjectionSpec
             handler = () =>
               R2dbcHandler[Envelope] { (session, envelope) =>
                 verifiedProbe.expectMessage(envelope.offset)
-                TestRepository(session).concatToText(envelope.id, 
envelope.message)
+                TestRepository(session, settings).concatToText(envelope.id, 
envelope.message)
               })
 
       projectionTestKit.run(projection) {
@@ -1292,7 +1304,7 @@ class R2dbcProjectionSpec
             sourceProvider = sourceProvider(entityId),
             handler = () =>
               R2dbcHandler[Envelope] { (session, envelope) =>
-                TestRepository(session).concatToText(envelope.id, 
envelope.message)
+                TestRepository(session, settings).concatToText(envelope.id, 
envelope.message)
               })
 
       offsetShouldBeEmpty()
@@ -1324,7 +1336,7 @@ class R2dbcProjectionSpec
             sourceProvider = sourceProvider(entityId),
             handler = () =>
               R2dbcHandler[Envelope] { (session, envelope) =>
-                TestRepository(session).concatToText(envelope.id, 
envelope.message)
+                TestRepository(session, settings).concatToText(envelope.id, 
envelope.message)
               })
 
       offsetShouldBeEmpty()
@@ -1357,7 +1369,7 @@ class R2dbcProjectionSpec
             sourceProvider = sourceProvider(entityId),
             handler = () =>
               R2dbcHandler[Envelope] { (session, envelope) =>
-                TestRepository(session).concatToText(envelope.id, 
envelope.message)
+                TestRepository(session, settings).concatToText(envelope.id, 
envelope.message)
               })
 
       offsetShouldBeEmpty()
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 3685242..f24b88f 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.projection.r2dbc
 
 import java.time.Instant
+import java.time.temporal.ChronoUnit
 import java.time.{ Duration => JDuration }
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
@@ -25,7 +26,6 @@ import scala.concurrent.Await
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
@@ -154,7 +154,7 @@ class R2dbcTimestampOffsetProjectionSpec
   private def createOffsetStore(
       projectionId: ProjectionId,
       sourceProvider: TestTimestampSourceProvider): R2dbcOffsetStore =
-    new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, 
r2dbcExecutor)
+    R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
   private val projectionTestKit = ProjectionTestKit(system)
 
   override protected def beforeAll(): Unit = {
@@ -228,7 +228,7 @@ class R2dbcTimestampOffsetProjectionSpec
   private def withRepo[R](fun: TestRepository => Future[R]): Future[R] = {
     r2dbcExecutor.withConnection("test") { conn =>
       val session = new R2dbcSession(conn)
-      fun(TestRepository(session))
+      fun(TestRepository(session, settings))
     }
   }
 
@@ -245,7 +245,7 @@ class R2dbcTimestampOffsetProjectionSpec
         throw TestException(concatHandlerFail4Msg + s" after $attempts 
attempts")
       } else {
         logger.debug(s"handling {}", envelope)
-        TestRepository(session).concatToText(envelope.persistenceId, 
envelope.event)
+        TestRepository(session, settings).concatToText(envelope.persistenceId, 
envelope.event)
       }
     }
 
@@ -287,8 +287,13 @@ class R2dbcTimestampOffsetProjectionSpec
     }
   }
 
+  def now(): Instant = {
+    // supported databases do not store more than 6 fractional digits
+    Instant.now().truncatedTo(ChronoUnit.MICROS)
+  }
+
   def createEnvelopesWithDuplicates(pid1: Pid, pid2: Pid): 
Vector[EventEnvelope[String]] = {
-    val startTime = Instant.now()
+    val startTime = now()
 
     Vector(
       createEnvelope(pid1, 1, startTime, "e1-1"),
@@ -342,7 +347,7 @@ class R2dbcTimestampOffsetProjectionSpec
       if (envelopes.isEmpty)
         Future.successful(Done)
       else {
-        val repo = TestRepository(session)
+        val repo = TestRepository(session, settings)
         val results = envelopes.groupBy(_.persistenceId).map { case (pid, 
envs) =>
           repo.findById(pid).flatMap { existing =>
             val newConcatStr = envs.foldLeft(existing.getOrElse(ConcatStr(pid, 
""))) { (acc, env) =>
@@ -552,7 +557,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val pid2 = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
 
-      val startTime = Instant.now()
+      val startTime = now()
       val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1, 
pid2)
       val sourceProvider1 = createSourceProvider(envelopes1)
       implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider1)
@@ -664,7 +669,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val pid2 = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
 
-      val startTime = Instant.now()
+      val startTime = now()
       val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1, 
pid2)
       val sourceProvider1 = createBacktrackingSourceProvider(envelopes1)
       implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider1)
@@ -813,10 +818,10 @@ class R2dbcTimestampOffsetProjectionSpec
       val pid2 = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
 
-      val startTime = Instant.now()
+      val startTime = now()
       val sourceProvider = new TestSourceProviderWithInput()
       implicit val offsetStore: R2dbcOffsetStore =
-        new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
+        R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), 
system, settings, r2dbcExecutor)
 
       val result1 = new StringBuffer()
       val result2 = new StringBuffer()
@@ -950,10 +955,10 @@ class R2dbcTimestampOffsetProjectionSpec
       val pid2 = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
 
-      val startTime = Instant.now()
+      val startTime = now()
       val sourceProvider = new TestSourceProviderWithInput()
       implicit val offsetStore: R2dbcOffsetStore =
-        new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
+        R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), 
system, settings, r2dbcExecutor)
 
       val projectionRef = spawn(
         ProjectionBehavior(
@@ -1154,10 +1159,10 @@ class R2dbcTimestampOffsetProjectionSpec
       val pid1 = UUID.randomUUID().toString
       val pid2 = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      val startTime = Instant.now()
+      val startTime = now()
       val sourceProvider = new TestSourceProviderWithInput()
       implicit val offsetStore: R2dbcOffsetStore =
-        new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
+        R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), 
system, settings, r2dbcExecutor)
 
       val result1 = new StringBuffer()
       val result2 = new StringBuffer()
@@ -1296,10 +1301,10 @@ class R2dbcTimestampOffsetProjectionSpec
       val pid2 = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
 
-      val startTime = Instant.now()
+      val startTime = now()
       val sourceProvider = new TestSourceProviderWithInput()
       implicit val offsetStore: R2dbcOffsetStore =
-        new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
+        R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), 
system, settings, r2dbcExecutor)
 
       val flowHandler =
         FlowWithContext[EventEnvelope[String], ProjectionContext]
@@ -1393,7 +1398,7 @@ class R2dbcTimestampOffsetProjectionSpec
             sourceProvider,
             handler = () =>
               R2dbcHandler[EventEnvelope[String]] { (session, envelope) =>
-                TestRepository(session).concatToText(envelope.persistenceId, 
envelope.event)
+                TestRepository(session, 
settings).concatToText(envelope.persistenceId, envelope.event)
               })
 
       offsetShouldBeEmpty()
@@ -1427,7 +1432,7 @@ class R2dbcTimestampOffsetProjectionSpec
             sourceProvider,
             handler = () =>
               R2dbcHandler[EventEnvelope[String]] { (session, envelope) =>
-                TestRepository(session).concatToText(envelope.persistenceId, 
envelope.event)
+                TestRepository(session, 
settings).concatToText(envelope.persistenceId, envelope.event)
               })
 
       offsetShouldBeEmpty()
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index f1e91cf..0708d74 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -81,7 +81,7 @@ class R2dbcTimestampOffsetStoreSpec
       projectionId: ProjectionId,
       customSettings: R2dbcProjectionSettings = settings,
       eventTimestampQueryClock: TestClock = clock) =
-    new R2dbcOffsetStore(
+    R2dbcOffsetStore.fromConfig(
       projectionId,
       Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 
1, eventTimestampQueryClock)),
       system,
@@ -281,7 +281,7 @@ class R2dbcTimestampOffsetStoreSpec
       slice4 shouldBe 656
 
       val offsetStore0 =
-        new R2dbcOffsetStore(
+        R2dbcOffsetStore.fromConfig(
           projectionId0,
           Some(new TestTimestampSourceProvider(0, 
persistenceExt.numberOfSlices - 1, clock)),
           system,
@@ -302,7 +302,7 @@ class R2dbcTimestampOffsetStoreSpec
       offsetStore0.saveOffset(offset4).futureValue
 
       val offsetStore1 =
-        new R2dbcOffsetStore(
+        R2dbcOffsetStore.fromConfig(
           projectionId1,
           Some(new TestTimestampSourceProvider(0, 511, clock)),
           system,
@@ -312,7 +312,7 @@ class R2dbcTimestampOffsetStoreSpec
       offsetStore1.getState().byPid.keySet shouldBe Set(p1, p2)
 
       val offsetStore2 =
-        new R2dbcOffsetStore(
+        R2dbcOffsetStore.fromConfig(
           projectionId2,
           Some(new TestTimestampSourceProvider(512, 1023, clock)),
           system,
@@ -561,7 +561,7 @@ class R2dbcTimestampOffsetStoreSpec
 
     "evict old records" in {
       val projectionId = genRandomProjectionId()
-      val evictSettings = settings.copy(timeWindow = JDuration.ofSeconds(100), 
evictInterval = JDuration.ofSeconds(10))
+      val evictSettings = 
settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10))
       import evictSettings._
       val offsetStore = createOffsetStore(projectionId, evictSettings)
 
@@ -604,7 +604,7 @@ class R2dbcTimestampOffsetStoreSpec
 
     "delete old records" in {
       val projectionId = genRandomProjectionId()
-      val deleteSettings = settings.copy(timeWindow = JDuration.ofSeconds(100))
+      val deleteSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
       import deleteSettings._
       val offsetStore = createOffsetStore(projectionId, deleteSettings)
 
@@ -635,8 +635,7 @@ class R2dbcTimestampOffsetStoreSpec
 
     "periodically delete old records" in {
       val projectionId = genRandomProjectionId()
-      val deleteSettings =
-        settings.copy(timeWindow = JDuration.ofSeconds(100), deleteInterval = 
JDuration.ofMillis(500))
+      val deleteSettings = 
settings.withTimeWindow(JDuration.ofSeconds(100)).withDeleteInterval(JDuration.ofMillis(500))
       import deleteSettings._
       val offsetStore = createOffsetStore(projectionId, deleteSettings)
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
index 13b8771..fc48aff 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
@@ -44,6 +44,21 @@ object TestConfig {
             database = "yugabyte"
           }
           """)
+      case "mysql" =>
+        ConfigFactory.parseString("""
+          pekko.persistence.r2dbc{
+            connection-factory {
+              driver = "mysql"
+              host = "localhost"
+              port = 3306
+              user = "root"
+              password = "root"
+              database = "mysql"
+            }
+            db-timestamp-monotonic-increasing = on
+            use-app-timestamp = on
+          }
+          """)
     }
 
     // using load here so that connection-factory can be overridden


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to