This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 719e9eb Core implementation for MySQL support (#175)
719e9eb is described below
commit 719e9eb6dd878409dc56036472d6d408030e08ba
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Fri Nov 15 19:23:09 2024 +0200
Core implementation for MySQL support (#175)
* Core implementation for MySQL support
* Add documentation for mysql-specific configuration
* Addressing various PR comments
* Fix license headers
* Remove spurious projection dependency on mysql
---
.github/workflows/build-test.yml | 55 ++++++++++
core/src/main/resources/reference.conf | 2 +-
.../r2dbc/ConnectionFactoryProvider.scala | 28 ++++--
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 4 +
.../persistence/r2dbc/internal/R2dbcExecutor.scala | 3 +-
.../pekko/persistence/r2dbc/internal/Sql.scala | 30 +++++-
.../persistence/r2dbc/journal/JournalDao.scala | 52 +++++++---
.../persistence/r2dbc/journal/R2dbcJournal.scala | 6 +-
.../r2dbc/journal/mysql/MySQLJournalDao.scala | 70 +++++++++++++
.../r2dbc/query/scaladsl/QueryDao.scala | 36 +++++--
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 11 +-
.../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala | 93 +++++++++++++++++
.../r2dbc/snapshot/R2dbcSnapshotStore.scala | 6 +-
.../persistence/r2dbc/snapshot/SnapshotDao.scala | 31 ++++--
.../r2dbc/snapshot/mysql/MySQLSnapshotDao.scala | 52 ++++++++++
.../r2dbc/state/scaladsl/DurableStateDao.scala | 41 ++++++--
.../state/scaladsl/R2dbcDurableStateStore.scala | 15 +--
.../scaladsl/mysql/MySQLDurableStateDao.scala | 92 +++++++++++++++++
.../pekko/persistence/r2dbc/TestConfig.scala | 15 +++
.../r2dbc/journal/PersistTagsSpec.scala | 11 ++
.../query/EventsBySliceBacktrackingSpec.scala | 5 +-
ddl-scripts/create_tables_mysql.sql | 112 +++++++++++++++++++++
ddl-scripts/drop_tables_mysql.sql | 23 +++++
docker/docker-compose-mysql.yml | 30 ++++++
docs/src/main/paradox/connection-config.md | 3 +
docs/src/test/resources/application-mysql.conf | 32 ++++++
project/Dependencies.scala | 2 +
27 files changed, 780 insertions(+), 80 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index a4696d6..6e7d7cd 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -132,6 +132,61 @@ jobs:
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte
-Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test
+ test-mysql:
+ name: Run tests with MySQL
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
+ JAVA_VERSION: [ 11, 17, 21 ]
+ # only compiling on JDK 8, because certain tests depend on the higher
timestamp precision added in JDK 9
+ include:
+ - JAVA_VERSION: 8
+ SCALA_VERSION: 2.12
+ COMPILE_ONLY: true
+ - JAVA_VERSION: 8
+ SCALA_VERSION: 2.13
+ COMPILE_ONLY: true
+ - JAVA_VERSION: 8
+ SCALA_VERSION: 3.3
+ COMPILE_ONLY: true
+ if: github.repository == 'apache/pekko-persistence-r2dbc'
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+ fetch-tags: true
+
+ - name: Checkout GitHub merge
+ if: github.event.pull_request
+ run: |-
+ git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
+ git checkout scratch
+
+ - name: Setup Java ${{ matrix.JAVA_VERSION }}
+ uses: actions/setup-java@v4
+ with:
+ distribution: temurin
+ java-version: ${{ matrix.JAVA_VERSION }}
+
+ - name: Install sbt
+ uses: sbt/setup-sbt@v1
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@v6
+
+ - name: Enable jvm-opts
+ run: cp .jvmopts-ci .jvmopts
+
+ - name: Start DB
+ run: |-
+ docker compose -f docker/docker-compose-mysql.yml up -d --wait
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
+
+ - name: test
+ run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{
matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' ||
'test' }}
+
test-docs:
name: Docs
runs-on: ubuntu-latest
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index 4e205ab..1419a80 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -101,7 +101,7 @@ pekko.persistence.r2dbc {
// #connection-settings
pekko.persistence.r2dbc {
- # postgres or yugabyte
+ # postgres, yugabyte or mysql
dialect = "postgres"
# set this to your database schema if applicable, empty by default
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
index c8af66b..612e765 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
@@ -19,24 +19,25 @@ import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
-
import com.typesafe.config.Config
+import io.r2dbc.pool.ConnectionPool
+import io.r2dbc.pool.ConnectionPoolConfiguration
+import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
+import io.r2dbc.postgresql.client.SSLMode
+import io.r2dbc.spi.ConnectionFactories
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.ConnectionFactoryOptions
+import io.r2dbc.spi.Option
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Extension
import pekko.actor.typed.ExtensionId
-import pekko.persistence.r2dbc.ConnectionFactoryProvider.{
ConnectionFactoryOptionsCustomizer, NoopCustomizer }
+import
pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
+import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.util.ccompat.JavaConverters._
-import io.r2dbc.pool.ConnectionPool
-import io.r2dbc.pool.ConnectionPoolConfiguration
-import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
-import io.r2dbc.postgresql.client.SSLMode
-import io.r2dbc.spi.ConnectionFactories
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.ConnectionFactoryOptions
object ConnectionFactoryProvider extends
ExtensionId[ConnectionFactoryProvider] {
def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new
ConnectionFactoryProvider(system)
@@ -149,6 +150,12 @@ class ConnectionFactoryProvider(system: ActorSystem[_])
extends Extension {
builder.option(PostgresqlConnectionFactoryProvider.SSL_ROOT_CERT,
settings.sslRootCert)
}
+ if (settings.driver == "mysql") {
+ // Either `connectionTimeZone = SERVER` or
`forceConnectionTimeZoneToSession = true` need to be set for timezones to work
correctly,
+ // likely caused by bug in
https://github.com/asyncer-io/r2dbc-mysql/pull/240.
+ builder.option(Option.valueOf("connectionTimeZone"), "SERVER")
+ }
+
ConnectionFactories.get(customizer(builder, config).build())
}
@@ -158,7 +165,8 @@ class ConnectionFactoryProvider(system: ActorSystem[_])
extends Extension {
val connectionFactory = createConnectionFactory(settings, customizer,
config)
val evictionInterval = {
- import settings.{ maxIdleTime, maxLifeTime }
+ import settings.maxIdleTime
+ import settings.maxLifeTime
if (maxIdleTime <= Duration.Zero && maxLifeTime <= Duration.Zero) {
JDuration.ZERO
} else if (maxIdleTime <= Duration.Zero) {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 49e4860..dd8856f 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -56,6 +56,7 @@ final class R2dbcSettings(config: Config) {
val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
+ case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported
dialects are [yugabyte, postgres].")
}
@@ -92,6 +93,9 @@ sealed trait Dialect
object Dialect {
case object Postgres extends Dialect
case object Yugabyte extends Dialect
+
+ /** @since 1.1.0 */
+ case object MySQL extends Dialect
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
index b757efe..894b8fa 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/R2dbcExecutor.scala
@@ -325,7 +325,8 @@ class R2dbcExecutor(val connectionFactory:
ConnectionFactory, log: Logger, logDb
connection.close().asFutureDone().map { _ =>
val durationMicros = durationInMicros(startTime)
if (durationMicros >= logDbCallsExceedingMicros)
- log.info("{} - DB call completed [{}] in [{}] µs", logPrefix,
r.toString, durationMicros: java.lang.Long)
+ log.info("{} - DB call completed [{}] in [{}] µs", logPrefix,
Option(r).map(_.toString).orNull,
+ durationMicros: java.lang.Long)
r
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
index f1884f4..affe56f 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala
@@ -14,8 +14,10 @@
package org.apache.pekko.persistence.r2dbc.internal
import scala.annotation.varargs
-
-import org.apache.pekko.annotation.InternalStableApi
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.annotation.InternalStableApi
+import pekko.persistence.r2dbc.Dialect
/**
* INTERNAL API: Utility to format SQL strings. Replaces `?` with numbered
`\$1`, `\$2` for bind parameters. Trims
@@ -24,6 +26,21 @@ import org.apache.pekko.annotation.InternalStableApi
@InternalStableApi
object Sql {
+ /**
+ * INTERNAL API
+ */
+ @InternalApi
+ private[r2dbc] implicit class DialectOps(dialect: Dialect) {
+ def replaceParameters(sql: String): String = {
+ dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ fillInParameterNumbers(sql)
+ case Dialect.MySQL =>
+ sql
+ }
+ }
+ }
+
/**
* Scala string interpolation with `sql` prefix. Replaces `?` with numbered
`\$1`, `\$2` for bind parameters. Trims
* whitespace, including line breaks. Standard string interpolation
arguments `$` can be used.
@@ -33,6 +50,15 @@ object Sql {
fillInParameterNumbers(trimLineBreaks(sc.s(args: _*)))
}
+ /**
+ * INTERNAL API
+ */
+ @InternalApi
+ private[r2dbc] implicit class DialectInterpolation(val sc: StringContext)
extends AnyVal {
+ def sql(args: Any*)(implicit dialect: Dialect): String =
+ dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
+ }
+
/**
* Java API: Replaces `?` with numbered `\$1`, `\$2` for bind parameters.
Trims whitespace, including line breaks. The
* arguments are used like in [[java.lang.String.format]].
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 8ca0198..4434cfd 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -17,20 +17,22 @@ import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.typed.PersistenceId
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
-import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -71,6 +73,19 @@ private[r2dbc] object JournalDao {
}
}
+ def fromConfig(
+ journalSettings: R2dbcSettings,
+ sharedConfigPath: String
+ )(implicit system: ActorSystem[_], ec: ExecutionContext): JournalDao = {
+ val connectionFactory =
+ ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
+ journalSettings.dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ new JournalDao(journalSettings, connectionFactory)
+ case Dialect.MySQL =>
+ new MySQLJournalDao(journalSettings, connectionFactory)
+ }
+ }
}
/**
@@ -86,13 +101,16 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
import JournalDao.SerializedJournalRow
import JournalDao.log
+ implicit protected val dialect: Dialect = journalSettings.dialect
+ protected lazy val timestampSql: String = "transaction_timestamp()"
+
private val persistenceExt = Persistence(system)
private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
journalSettings.logDbCallsExceeding)(ec, system)
- private val journalTable = journalSettings.journalTableWithSchema
+ protected val journalTable: String = journalSettings.journalTableWithSchema
- private val (insertEventWithParameterTimestampSql,
insertEventWithTransactionTimestampSql) = {
+ protected val (insertEventWithParameterTimestampSql: String,
insertEventWithTransactionTimestampSql: String) = {
val baseSql =
s"INSERT INTO $journalTable " +
"(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest,
event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id,
meta_ser_manifest, meta_payload, db_timestamp) " +
@@ -132,7 +150,7 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
private val insertDeleteMarkerSql = sql"""
INSERT INTO $journalTable
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer,
adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
- VALUES (?, ?, ?, ?, transaction_timestamp(), ?, ?, ?, ?, ?, ?)"""
+ VALUES (?, ?, ?, ?, $timestampSql, ?, ?, ?, ?, ?, ?)"""
/**
* All events must be for the same persistenceId.
@@ -217,12 +235,18 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
result.foreach { _ =>
log.debug("Wrote [{}] events for persistenceId [{}]", 1,
events.head.persistenceId)
}
- result
+ if (useTimestampFromDb) {
+ result
+ } else {
+ result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
+ }
} else {
val result = r2dbcExecutor.updateInBatchReturning(s"batch insert
[$persistenceId], [$totalEvents] events")(
connection =>
- events.foldLeft(connection.createStatement(insertSql)) { (stmt,
write) =>
- stmt.add()
+ events.zipWithIndex.foldLeft(connection.createStatement(insertSql))
{ case (stmt, (write, idx)) =>
+ if (idx != 0) {
+ stmt.add()
+ }
bind(stmt, write)
},
row => row.get(0, classOf[Instant]))
@@ -230,7 +254,11 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
result.foreach { _ =>
log.debug("Wrote [{}] events for persistenceId [{}]", 1,
events.head.persistenceId)
}
- result.map(_.head)(ExecutionContexts.parasitic)
+ if (useTimestampFromDb) {
+ result.map(_.head)(ExecutionContexts.parasitic)
+ } else {
+ result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic)
+ }
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index eb111cf..d141ebd 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -36,7 +36,6 @@ import pekko.persistence.PersistentRepr
import pekko.persistence.journal.AsyncWriteJournal
import pekko.persistence.journal.Tagged
import pekko.persistence.query.PersistenceQuery
-import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
@@ -97,10 +96,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
private val serialization: Serialization =
SerializationExtension(context.system)
private val journalSettings =
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
- private val journalDao =
- new JournalDao(
- journalSettings,
- ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory"))
+ private val journalDao = JournalDao.fromConfig(journalSettings,
sharedConfigPath)
private val query =
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath +
".query")
private val pubSub: Option[PubSub] =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
new file mode 100644
index 0000000..9adba88
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal.mysql
+
+import scala.concurrent.ExecutionContext
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.JournalDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] object MySQLJournalDao {
+ def settingRequirements(journalSettings: R2dbcSettings): Unit = {
+ // Application timestamps are used because MySQL does not have
transaction_timestamp like Postgres. In future releases
+ // they could be tried to be emulated, but the benefits are questionable -
no matter where the timestamps are generated,
+ // risk of clock skews remains.
+ require(journalSettings.useAppTimestamp,
+ "use-app-timestamp config must be on for MySQL support")
+ // Supporting the non-monotonic increasing timestamps by incrementing the
timestamp within the insert queries based on
+ // latest row in the database seems to cause deadlocks when running tests
like PersistTimestampSpec. Possibly this could
+ // be fixed.
+ require(journalSettings.dbTimestampMonotonicIncreasing,
+ "db-timestamp-monotonic-increasing config must be on for MySQL support")
+ // Also, missing RETURNING implementation makes grabbing the timestamp
generated by the database less efficient - this
+ // applies for both of the requirements above.
+ }
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLJournalDao(
+ journalSettings: R2dbcSettings,
+ connectionFactory: ConnectionFactory)(
+ implicit ec: ExecutionContext, system: ActorSystem[_]
+) extends JournalDao(journalSettings, connectionFactory) {
+ MySQLJournalDao.settingRequirements(journalSettings)
+
+ override lazy val timestampSql: String = "NOW(6)"
+
+ override val insertEventWithParameterTimestampSql: String =
+ sql"INSERT INTO $journalTable " +
+ "(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest,
event_ser_id, event_ser_manifest, " +
+ "event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload,
db_timestamp) " +
+ s"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index 0fff879..46e59b0 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -19,28 +19,42 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-
+import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
+import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
import pekko.stream.scaladsl.Source
-import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
object QueryDao {
val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])
+
+ def fromConfig(
+ journalSettings: R2dbcSettings,
+ sharedConfigPath: String
+ )(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = {
+ val connectionFactory =
+ ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
+ journalSettings.dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ new QueryDao(journalSettings, connectionFactory)
+ case Dialect.MySQL =>
+ new MySQLQueryDao(journalSettings, connectionFactory)
+ }
+ }
}
/**
@@ -54,12 +68,15 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
import JournalDao.readMetadata
import QueryDao.log
- private val journalTable = settings.journalTableWithSchema
+ implicit protected val dialect: Dialect = settings.dialect
+ protected lazy val statementTimestampSql: String = "statement_timestamp()"
+
+ protected val journalTable = settings.journalTableWithSchema
private val currentDbTimestampSql =
"SELECT transaction_timestamp() AS db_timestamp"
- private def eventsBySlicesRangeSql(
+ protected def eventsBySlicesRangeSql(
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
@@ -96,10 +113,11 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
settings.dialect match {
case Dialect.Yugabyte => s"slice BETWEEN $minSlice AND $maxSlice"
case Dialect.Postgres => s"slice in (${(minSlice to
maxSlice).mkString(",")})"
+ case unhandled => throw new IllegalArgumentException(s"Unable to
handle dialect [$unhandled]")
}
}
- private def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+ protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
sql"""
SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*)
AS count
FROM $journalTable
@@ -116,12 +134,12 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
private val selectOneEventSql = sql"""
- SELECT slice, entity_type, db_timestamp, statement_timestamp() AS
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload,
meta_ser_id, meta_ser_manifest, meta_payload
+ SELECT slice, entity_type, db_timestamp, $statementTimestampSql AS
read_db_timestamp, event_ser_id, event_ser_manifest, event_payload,
meta_ser_id, meta_ser_manifest, meta_payload
FROM $journalTable
WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
private val selectEventsSql = sql"""
- SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
statement_timestamp() AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload
+ SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload
from $journalTable
WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
AND deleted = false
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 91ea310..4280f11 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -19,10 +19,11 @@ import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
-
+import com.typesafe.config.Config
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorSystem
import pekko.actor.typed.pubsub.Topic
import pekko.actor.typed.scaladsl.adapter._
import pekko.annotation.InternalApi
@@ -48,7 +49,6 @@ import pekko.serialization.SerializationExtension
import pekko.stream.OverflowStrategy
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Source
-import com.typesafe.config.Config
import org.slf4j.LoggerFactory
object R2dbcReadJournal {
@@ -76,14 +76,13 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
private val sharedConfigPath = cfgPath.replaceAll("""\.query$""", "")
private val settings =
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
- private val typedSystem = system.toTyped
+ private implicit val typedSystem: ActorSystem[_] = system.toTyped
import typedSystem.executionContext
private val serialization = SerializationExtension(system)
private val persistenceExt = Persistence(system)
private val connectionFactory = ConnectionFactoryProvider(typedSystem)
.connectionFactoryFor(sharedConfigPath + ".connection-factory")
- private val queryDao =
- new QueryDao(settings, connectionFactory)(typedSystem.executionContext,
typedSystem)
+ private val queryDao = QueryDao.fromConfig(settings, sharedConfigPath)
private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]]
= {
val createEnvelope: (TimestampOffset, SerializedJournalRow) =>
EventEnvelope[Any] = (offset, row) => {
@@ -108,7 +107,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
private def bySlice[Event]: BySliceQuery[SerializedJournalRow,
EventEnvelope[Event]] =
_bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow,
EventEnvelope[Event]]]
- private val journalDao = new JournalDao(settings,
connectionFactory)(typedSystem.executionContext, typedSystem)
+ private val journalDao = JournalDao.fromConfig(settings, sharedConfigPath)
def extractEntityTypeFromPersistenceId(persistenceId: String): String =
PersistenceId.extractEntityType(persistenceId)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
new file mode 100644
index 0000000..8cbc5b6
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.query.scaladsl.mysql
+
+import java.time.Instant
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.FiniteDuration
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.query.scaladsl.QueryDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLQueryDao(
+ journalSettings: R2dbcSettings,
+ connectionFactory: ConnectionFactory
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
QueryDao(journalSettings, connectionFactory) {
+
+ override lazy val statementTimestampSql: String = "NOW(6)"
+
+ override def eventsBySlicesRangeSql(
+ toDbTimestampParam: Boolean,
+ behindCurrentTime: FiniteDuration,
+ backtracking: Boolean,
+ minSlice: Int,
+ maxSlice: Int): String = {
+
+ def toDbTimestampParamCondition =
+ if (toDbTimestampParam) "AND db_timestamp <= ?" else ""
+
+ def behindCurrentTimeIntervalCondition =
+ if (behindCurrentTime > Duration.Zero)
+ s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL
'${behindCurrentTime.toMicros}' MICROSECOND)"
+ else ""
+
+ val selectColumns = {
+ if (backtracking)
+ s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp "
+ else
+ s"SELECT slice, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
+ }
+
+ sql"""
+ $selectColumns
+ FROM $journalTable
+ WHERE entity_type = ?
+ AND slice BETWEEN $minSlice AND $maxSlice
+ AND db_timestamp >= ? $toDbTimestampParamCondition
$behindCurrentTimeIntervalCondition
+ AND deleted = false
+ ORDER BY db_timestamp, seq_nr
+ LIMIT ?"""
+ }
+
+ override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+ sql"""
+ SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket,
count(*) AS count
+ FROM $journalTable
+ WHERE entity_type = ?
+ AND slice BETWEEN $minSlice AND $maxSlice
+ AND db_timestamp >= ? AND db_timestamp <= ?
+ AND deleted = false
+ GROUP BY bucket ORDER BY bucket LIMIT ?
+ """
+ }
+
+ override def currentDbTimestamp(): Future[Instant] =
Future.successful(Instant.now())
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
index 9402583..7d637ad 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata,
SnapshotSelectionCriteria }
-import pekko.persistence.r2dbc.{ ConnectionFactoryProvider, R2dbcSettings }
+import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.snapshot.SnapshotStore
import pekko.serialization.{ Serialization, SerializationExtension }
import com.typesafe.config.Config
@@ -59,9 +59,7 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config,
cfgPath: String) exte
private val dao = {
val sharedConfigPath = cfgPath.replaceAll("""\.snapshot$""", "")
val settings =
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
- new SnapshotDao(
- settings,
- ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory"))
+ SnapshotDao.fromConfig(settings, sharedConfigPath)
}
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria):
Future[Option[SelectedSnapshot]] =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 16968b3..043ff6e 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -15,19 +15,21 @@ package org.apache.pekko.persistence.r2dbc.snapshot
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.SnapshotSelectionCriteria
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
import pekko.persistence.typed.PersistenceId
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -66,6 +68,19 @@ private[r2dbc] object SnapshotDao {
row.get("meta_ser_manifest", classOf[String])))
})
+ def fromConfig(
+ journalSettings: R2dbcSettings,
+ sharedConfigPath: String
+ )(implicit system: ActorSystem[_], ec: ExecutionContext): SnapshotDao = {
+ val connectionFactory =
+ ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
+ journalSettings.dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ new SnapshotDao(journalSettings, connectionFactory)
+ case Dialect.MySQL =>
+ new MySQLSnapshotDao(journalSettings, connectionFactory)
+ }
+ }
}
/**
@@ -74,16 +89,18 @@ private[r2dbc] object SnapshotDao {
* Class for doing db interaction outside of an actor to avoid mistakes in
future callbacks
*/
@InternalApi
-private[r2dbc] final class SnapshotDao(settings: R2dbcSettings,
connectionFactory: ConnectionFactory)(implicit
+private[r2dbc] class SnapshotDao(settings: R2dbcSettings, connectionFactory:
ConnectionFactory)(implicit
ec: ExecutionContext,
system: ActorSystem[_]) {
import SnapshotDao._
- private val snapshotTable = settings.snapshotsTableWithSchema
+ implicit protected val dialect: Dialect = settings.dialect
+
+ protected val snapshotTable: String = settings.snapshotsTableWithSchema
private val persistenceExt = Persistence(system)
private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
- private val upsertSql = sql"""
+ protected val upsertSql = sql"""
INSERT INTO $snapshotTable
(slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot,
ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
new file mode 100644
index 0000000..e725168
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.snapshot.mysql
+
+import scala.concurrent.ExecutionContext
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.snapshot.SnapshotDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLSnapshotDao(
+ settings: R2dbcSettings, connectionFactory: ConnectionFactory
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
SnapshotDao(settings, connectionFactory) {
+
+ override val upsertSql = sql"""
+ INSERT INTO $snapshotTable
+ (slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot,
ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS excluded
+ ON DUPLICATE KEY UPDATE
+ seq_nr = excluded.seq_nr,
+ write_timestamp = excluded.write_timestamp,
+ snapshot = excluded.snapshot,
+ ser_id = excluded.ser_id,
+ ser_manifest = excluded.ser_manifest,
+ meta_payload = excluded.meta_payload,
+ meta_ser_id = excluded.meta_ser_id,
+ meta_ser_manifest = excluded.meta_ser_manifest"""
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 03cdc88..80ef95b 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -19,7 +19,9 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.R2dbcDataIntegrityViolationException
+import io.r2dbc.spi.Statement
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@@ -27,18 +29,17 @@ import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.R2dbcDataIntegrityViolationException
-import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -61,6 +62,20 @@ import org.slf4j.LoggerFactory
extends BySliceQuery.SerializedRow {
override def seqNr: Long = revision
}
+
+ def fromConfig(
+ journalSettings: R2dbcSettings,
+ sharedConfigPath: String
+ )(implicit system: ActorSystem[_], ec: ExecutionContext): DurableStateDao = {
+ val connectionFactory =
+ ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
+ journalSettings.dialect match {
+ case Dialect.Postgres | Dialect.Yugabyte =>
+ new DurableStateDao(journalSettings, connectionFactory)
+ case Dialect.MySQL =>
+ new MySQLDurableStateDao(journalSettings, connectionFactory)
+ }
+ }
}
/**
@@ -75,16 +90,19 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
extends BySliceQuery.Dao[DurableStateDao.SerializedStateRow] {
import DurableStateDao._
+ implicit protected val dialect: Dialect = settings.dialect
+ protected lazy val transactionTimestampSql: String =
"transaction_timestamp()"
+
private val persistenceExt = Persistence(system)
private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
- private val stateTable = settings.durableStateTableWithSchema
+ protected val stateTable = settings.durableStateTableWithSchema
private val selectStateSql: String = sql"""
SELECT revision, state_ser_id, state_ser_manifest, state_payload,
db_timestamp
FROM $stateTable WHERE persistence_id = ?"""
- private def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+ protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
sql"""
SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*)
AS count
FROM $stateTable
@@ -99,20 +117,21 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
settings.dialect match {
case Dialect.Yugabyte => s"slice BETWEEN $minSlice AND $maxSlice"
case Dialect.Postgres => s"slice in (${(minSlice to
maxSlice).mkString(",")})"
+ case unhandled => throw new IllegalArgumentException(s"Unable to
handle dialect [$unhandled]")
}
}
private val insertStateSql: String = sql"""
INSERT INTO $stateTable
(slice, entity_type, persistence_id, revision, state_ser_id,
state_ser_manifest, state_payload, tags, db_timestamp)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, transaction_timestamp())"""
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, $transactionTimestampSql)"""
private val updateStateSql: String = {
val timestamp =
if (settings.dbTimestampMonotonicIncreasing)
- "transaction_timestamp()"
+ s"$transactionTimestampSql"
else
- "GREATEST(transaction_timestamp(), " +
+ s"GREATEST($transactionTimestampSql, " +
s"(SELECT db_timestamp + '1 microsecond'::interval FROM $stateTable
WHERE persistence_id = ? AND revision = ?))"
val revisionCondition =
@@ -141,7 +160,7 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
private val allPersistenceIdsAfterSql =
sql"SELECT persistence_id from $stateTable WHERE persistence_id > ? ORDER
BY persistence_id LIMIT ?"
- private def stateBySlicesRangeSql(
+ protected def stateBySlicesRangeSql(
maxDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index 546f954..4d7de65 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -16,11 +16,12 @@ package org.apache.pekko.persistence.r2dbc.state.scaladsl
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-
+import com.typesafe.config.Config
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
@@ -30,7 +31,6 @@ import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
import
pekko.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
-import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.ContinuousQuery
@@ -40,7 +40,6 @@ import pekko.persistence.state.scaladsl.GetObjectResult
import pekko.serialization.SerializationExtension
import pekko.serialization.Serializers
import pekko.stream.scaladsl.Source
-import com.typesafe.config.Config
import org.slf4j.LoggerFactory
object R2dbcDurableStateStore {
@@ -59,15 +58,11 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "")
private val settings =
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
- private val typedSystem = system.toTyped
+ private implicit val typedSystem: ActorSystem[_] = system.toTyped
+ implicit val ec: ExecutionContext = system.dispatcher
private val serialization = SerializationExtension(system)
private val persistenceExt = Persistence(system)
- private val stateDao =
- new DurableStateDao(
- settings,
-
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(sharedConfigPath +
".connection-factory"))(
- typedSystem.executionContext,
- typedSystem)
+ private val stateDao = DurableStateDao.fromConfig(settings, sharedConfigPath)
private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]]
= {
val createEnvelope: (TimestampOffset, SerializedStateRow) =>
DurableStateChange[A] = (offset, row) => {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
new file mode 100644
index 0000000..385dc21
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.state.scaladsl.mysql
+
+import java.time.Instant
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.FiniteDuration
+import io.r2dbc.spi.ConnectionFactory
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
+import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] class MySQLDurableStateDao(
+ settings: R2dbcSettings,
+ connectionFactory: ConnectionFactory
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
DurableStateDao(settings, connectionFactory) {
+ MySQLJournalDao.settingRequirements(settings)
+
+ override lazy val transactionTimestampSql: String = "NOW(6)"
+
+ override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
+ sql"""
+ SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket,
count(*) AS count
+ FROM $stateTable
+ WHERE entity_type = ?
+ AND slice BETWEEN $minSlice AND $maxSlice
+ AND db_timestamp >= ? AND db_timestamp <= ?
+ GROUP BY bucket ORDER BY bucket LIMIT ?
+ """
+ }
+
+ override def stateBySlicesRangeSql(
+ maxDbTimestampParam: Boolean,
+ behindCurrentTime: FiniteDuration,
+ backtracking: Boolean,
+ minSlice: Int,
+ maxSlice: Int): String = {
+
+ def maxDbTimestampParamCondition =
+ if (maxDbTimestampParam) s"AND db_timestamp < ?" else ""
+
+ def behindCurrentTimeIntervalCondition =
+ if (behindCurrentTime > Duration.Zero)
+ s"AND db_timestamp < DATE_SUB(NOW(6), INTERVAL
'${behindCurrentTime.toMicros}' MICROSECOND)"
+ else ""
+
+ val selectColumns =
+ if (backtracking)
+ "SELECT persistence_id, revision, db_timestamp, NOW(6) AS
read_db_timestamp "
+ else
+ "SELECT persistence_id, revision, db_timestamp, NOW(6) AS
read_db_timestamp, state_ser_id, state_ser_manifest, state_payload "
+
+ sql"""
+ $selectColumns
+ FROM $stateTable
+ WHERE entity_type = ?
+ AND slice BETWEEN $minSlice AND $maxSlice
+ AND db_timestamp >= ? $maxDbTimestampParamCondition
$behindCurrentTimeIntervalCondition
+ ORDER BY db_timestamp, revision
+ LIMIT ?"""
+ }
+
+ override def currentDbTimestamp(): Future[Instant] =
Future.successful(Instant.now())
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
index e54fd8b..161262e 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
@@ -44,6 +44,21 @@ object TestConfig {
database = "yugabyte"
}
""")
+ case "mysql" =>
+ ConfigFactory.parseString("""
+ pekko.persistence.r2dbc{
+ connection-factory {
+ driver = "mysql"
+ host = "localhost"
+ port = 3306
+ user = "root"
+ password = "root"
+ database = "mysql"
+ }
+ db-timestamp-monotonic-increasing = on
+ use-app-timestamp = on
+ }
+ """)
}
// using load here so that connection-factory can be overridden
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index 7a490ed..d0c6fcc 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -40,7 +40,18 @@ class PersistTagsSpec
case class Row(pid: String, seqNr: Long, tags: Set[String])
+ private lazy val dialect =
system.settings.config.getString("pekko.persistence.r2dbc.dialect")
+
+ private lazy val testEnabled: Boolean = {
+ // tags are not implemented for MySQL
+ dialect != "mysql"
+ }
+
"Persist tags" should {
+ if (!testEnabled) {
+ info(s"PersistTagsSpec not enabled for $dialect")
+ pending
+ }
"be the same for events stored in same transaction" in {
val numberOfEntities = 9
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index a88010f..d980798 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -16,7 +16,6 @@ package org.apache.pekko.persistence.r2dbc.query
import java.time.Instant
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@@ -24,11 +23,12 @@ import pekko.actor.typed.ActorSystem
import pekko.persistence.query.NoOffset
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.serialization.SerializationExtension
@@ -55,6 +55,7 @@ class EventsBySliceBacktrackingSpec
// to be able to store events with specific timestamps
private def writeEvent(slice: Int, persistenceId: String, seqNr: Long,
timestamp: Instant, event: String): Unit = {
log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId,
seqNr: java.lang.Long, event, timestamp)
+ implicit val dialect: Dialect = settings.dialect
val insertEventSql = sql"""
INSERT INTO ${settings.journalTableWithSchema}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer,
adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
diff --git a/ddl-scripts/create_tables_mysql.sql
b/ddl-scripts/create_tables_mysql.sql
new file mode 100644
index 0000000..67554e7
--- /dev/null
+++ b/ddl-scripts/create_tables_mysql.sql
@@ -0,0 +1,112 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS event_journal(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ db_timestamp TIMESTAMP(6) NOT NULL,
+
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest VARCHAR(255) NOT NULL,
+ event_payload BLOB NOT NULL,
+
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ writer VARCHAR(255) NOT NULL,
+ adapter_manifest VARCHAR(255),
+ tags TEXT, -- FIXME no array type, is this the best option?
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BLOB,
+
+ PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX event_journal_slice_idx ON event_journal(slice, entity_type,
db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ write_timestamp BIGINT NOT NULL,
+ ser_id INTEGER NOT NULL,
+ ser_manifest VARCHAR(255) NOT NULL,
+ snapshot BLOB NOT NULL,
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BLOB,
+
+ PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ db_timestamp TIMESTAMP(6) NOT NULL,
+
+ state_ser_id INTEGER NOT NULL,
+ state_ser_manifest VARCHAR(255),
+ state_payload BLOB NOT NULL,
+ tags TEXT, -- FIXME no array type, is this the best option?
+
+ PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type,
db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table
is not created.
+CREATE TABLE IF NOT EXISTS projection_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ current_offset VARCHAR(255) NOT NULL,
+ manifest VARCHAR(32) NOT NULL,
+ mergeable BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ slice INT NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ -- timestamp_offset is the db_timestamp of the original event
+ timestamp_offset TIMESTAMP(6) NOT NULL,
+ -- timestamp_consumed is when the offset was stored
+ -- the consumer lag is timestamp_consumed - timestamp_offset
+ timestamp_consumed TIMESTAMP(6) NOT NULL,
+ PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS projection_management (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ paused BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
diff --git a/ddl-scripts/drop_tables_mysql.sql
b/ddl-scripts/drop_tables_mysql.sql
new file mode 100644
index 0000000..ecef573
--- /dev/null
+++ b/ddl-scripts/drop_tables_mysql.sql
@@ -0,0 +1,23 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS event_journal;
+DROP TABLE IF EXISTS snapshot;
+DROP TABLE IF EXISTS durable_state;
+DROP TABLE IF EXISTS projection_offset_store;
+DROP TABLE IF EXISTS projection_timestamp_offset_store;
+DROP TABLE IF EXISTS projection_management;
diff --git a/docker/docker-compose-mysql.yml b/docker/docker-compose-mysql.yml
new file mode 100644
index 0000000..3a7c069
--- /dev/null
+++ b/docker/docker-compose-mysql.yml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+ mysql-db:
+ image: mysql:9.1.0
+ container_name: docker-mysql-db-1
+ ports:
+ - 3306:3306
+ environment:
+ MYSQL_ROOT_PASSWORD: root
+ healthcheck:
+ test: [ "CMD", "mysqladmin", "--password=root", "ping", "-h",
"127.0.0.1" ]
+ interval: 1s
+ timeout: 1s
+ retries: 60
diff --git a/docs/src/main/paradox/connection-config.md
b/docs/src/main/paradox/connection-config.md
index 8f6dc82..c96a6b6 100644
--- a/docs/src/main/paradox/connection-config.md
+++ b/docs/src/main/paradox/connection-config.md
@@ -9,6 +9,9 @@ Postgres:
Yugabyte:
: @@snip
[application.conf](/docs/src/test/resources/application-yugabyte.conf) {
#connection-settings }
+MySQL:
+: @@snip [application.conf](/docs/src/test/resources/application-mysql.conf) {
#connection-settings }
+
## Reference configuration
The following can be overridden in your `application.conf` for the connection
settings:
diff --git a/docs/src/test/resources/application-mysql.conf
b/docs/src/test/resources/application-mysql.conf
new file mode 100644
index 0000000..e645b4a
--- /dev/null
+++ b/docs/src/test/resources/application-mysql.conf
@@ -0,0 +1,32 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
+pekko.persistence.snapshot-store.plugin = "pekko.persistence.r2dbc.snapshot"
+pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
+
+// #connection-settings
+pekko.persistence.r2dbc {
+ dialect = "mysql"
+ connection-factory {
+ driver = "mysql"
+ host = "localhost"
+ host = ${?DB_HOST}
+ port = 3306
+ database = "mysql"
+ database = ${?DB_NAME}
+ user = "root"
+ user = ${?DB_USER}
+ password = "root"
+ password = ${?DB_PASSWORD}
+
+ db-timestamp-monotonic-increasing = on
+ use-app-timestamp = on
+
+ # ssl {
+ # enabled = on
+ # mode = "VERIFY_CA"
+ # root-cert = "/path/db_root.crt"
+ # }
+ }
+}
+// #connection-settings
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index c01be9b..b0818d4 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -36,6 +36,7 @@ object Dependencies {
val r2dbcPool = "io.r2dbc" % "r2dbc-pool" % "1.0.2.RELEASE"
val r2dbcPostgres = Seq(
"org.postgresql" % "r2dbc-postgresql" % "1.0.7.RELEASE")
+ val r2dbcMysql = "io.asyncer" % "r2dbc-mysql" % "1.3.0"
}
object TestDeps {
@@ -75,6 +76,7 @@ object Dependencies {
pekkoPersistenceQuery,
r2dbcSpi,
r2dbcPool,
+ r2dbcMysql % "provided,test",
TestDeps.pekkoPersistenceTck,
TestDeps.pekkoStreamTestkit,
TestDeps.pekkoActorTestkitTyped,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]