This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new 8683f70  projection-r2dbc: support mysql (#495)
8683f70 is described below

commit 8683f703b00491f7e7dd3f8f73df44c24ad8b528
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 16:10:27 2026 +0100

    projection-r2dbc: support mysql (#495)
    
    * try mysql
    
    * Update integration-tests-r2dbc.yml
    
    * Update integration-tests-r2dbc.yml
    
    * Update create_tables_mysql.sql
    
    * Fix MySQL R2DBC issues: batch insert ordering and sql interpolation in 
tests
    
    * Fix MySQL R2DBC: move dialect-specific fixes into MySQLR2dbcOffsetStore
    
    - Revert insertTimestampOffsetInTx in R2dbcOffsetStore back to original
      add-before-bind ordering (correct for PostgreSQL/Yugabyte)
    - Extract bindTimestampOffsetRecord as a protected helper method
    - Extract executeDeleteOldTimestampOffsets as a protected hook method
    - Make timestampOffsetTable, insertTimestampOffsetSql, and logger protected
      so MySQL subclass can access them
    - Override insertTimestampOffsetInTx in MySQLR2dbcOffsetStore with
      bind-before-add ordering required by the MySQL R2DBC driver
    - Override executeDeleteOldTimestampOffsets in MySQLR2dbcOffsetStore with
      MySQL-compatible SQL: CONCAT() instead of ||, NOT IN (?,?,...) instead
      of = ANY (?), dynamic placeholders for the exclusion list, and handles
      the empty-exclusion-list case
    
    * Fix compile error: replace trace2 with trace in MySQLR2dbcOffsetStore
    
    * Fix MySQL test failures: identifier quoting and timestamp precision (#13)
    
    * fix: MySQL test issues - double-quoted table name and timestamp precision
    
    * fix: make createTableSql dialect-aware, preserving double-quote quoting 
for postgres/yugabyte
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    
    * Fix savePaused to accept MySQL ON DUPLICATE KEY UPDATE row count and 
include count in error (#14)
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    
    * Potential fix for pull request finding
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    * Add MySQLR2dbcOffsetStoreSqlSpec unit tests for delete SQL correctness 
(#15)
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    
    * Update MySQLR2dbcOffsetStoreSqlSpec.scala
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 .github/workflows/integration-tests-r2dbc.yml      |  51 ++++++++++
 docker-files/docker-compose-mysql.yml              |  30 ++++++
 r2dbc-int-test/ddl-scripts/create_tables_mysql.sql | 112 +++++++++++++++++++++
 .../r2dbc/EventSourcedEndToEndSpec.scala           |   4 +-
 .../r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala       |  74 ++++++++++++++
 .../projection/r2dbc/R2dbcOffsetStoreSpec.scala    |   4 +-
 .../projection/r2dbc/R2dbcProjectionSpec.scala     |  14 ++-
 .../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala |   2 +-
 .../r2dbc/internal/R2dbcOffsetStore.scala          |  78 +++++++-------
 .../internal/mysql/MySQLR2dbcOffsetStore.scala     |  93 +++++++++++++++++
 10 files changed, 420 insertions(+), 42 deletions(-)

diff --git a/.github/workflows/integration-tests-r2dbc.yml 
b/.github/workflows/integration-tests-r2dbc.yml
index 9920981..d5319f4 100644
--- a/.github/workflows/integration-tests-r2dbc.yml
+++ b/.github/workflows/integration-tests-r2dbc.yml
@@ -124,3 +124,54 @@ jobs:
       - name: Print logs on failure
         if: ${{ failure() }}
         run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
+
+  mysql-test:
+    name: Build and Test Integration for R2DBC with MySQL
+    runs-on: ubuntu-22.04
+    if: github.repository == 'apache/pekko-projection'
+    strategy:
+      fail-fast: false
+      matrix:
+        include:
+          - { java-version: 17, scala-version: 2.13,  sbt-opts: '' }
+
+    steps:
+      - name: Checkout
+        uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # 
v6.0.2
+        with:
+          fetch-depth: 0
+          fetch-tags: true
+
+      - name: Checkout GitHub merge
+        if: github.event.pull_request
+        run: |-
+          git fetch origin pull/${{ github.event.pull_request.number 
}}/merge:scratch
+          git checkout scratch
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
v8.1.0
+
+      - name: Setup JDK ${{ matrix.java-version }}
+        uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # 
v5.2.0
+        with:
+          java-version: ${{ matrix.java-version }}
+          distribution: temurin
+
+      - name: Install sbt
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+      - name: Start MySQL
+        run: |-
+          docker compose -f docker-files/docker-compose-mysql.yml up -d --wait
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=mysql < 
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root -e 'CREATE SCHEMA database1;'
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=database1 < 
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root -e 'CREATE SCHEMA database2;'
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=database2 < 
r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
+
+      - name: test
+        run: sbt -Dpekko.persistence.r2dbc.dialect=mysql 
-Dpekko.projection.r2dbc.dialect=mysql ++${{ matrix.scala-version }} 
"r2dbc-int-test/test" ${{ matrix.extraOpts }}
+
+      - name: Print logs on failure
+        if: ${{ failure() }}
+        run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
diff --git a/docker-files/docker-compose-mysql.yml 
b/docker-files/docker-compose-mysql.yml
new file mode 100644
index 0000000..df19260
--- /dev/null
+++ b/docker-files/docker-compose-mysql.yml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+  mysql-db:
+    image: mysql:9.5.0
+    container_name: docker-mysql-db-1
+    ports:
+      - 3306:3306
+    environment:
+      MYSQL_ROOT_PASSWORD: root
+    healthcheck:
+      test: [ "CMD", "mysqladmin", "--password=root", "ping", "-h", 
"127.0.0.1" ]
+      interval: 1s
+      timeout: 1s
+      retries: 60
diff --git a/r2dbc-int-test/ddl-scripts/create_tables_mysql.sql 
b/r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
new file mode 100644
index 0000000..e006404
--- /dev/null
+++ b/r2dbc-int-test/ddl-scripts/create_tables_mysql.sql
@@ -0,0 +1,112 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS event_journal(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  db_timestamp TIMESTAMP(6) NOT NULL,
+
+  event_ser_id INTEGER NOT NULL,
+  event_ser_manifest VARCHAR(255) NOT NULL,
+  event_payload BLOB NOT NULL,
+
+  deleted BOOLEAN DEFAULT FALSE NOT NULL,
+  writer VARCHAR(255) NOT NULL,
+  adapter_manifest VARCHAR(255),
+  tags JSON, -- stored as a JSON array of strings
+
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BLOB,
+
+  PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX event_journal_slice_idx ON event_journal(slice, entity_type, 
db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  write_timestamp BIGINT NOT NULL,
+  ser_id INTEGER NOT NULL,
+  ser_manifest VARCHAR(255) NOT NULL,
+  snapshot BLOB NOT NULL,
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BLOB,
+
+  PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  revision BIGINT NOT NULL,
+  db_timestamp TIMESTAMP(6) NOT NULL,
+
+  state_ser_id INTEGER NOT NULL,
+  state_ser_manifest VARCHAR(255),
+  state_payload BLOB NOT NULL,
+  tags JSON, -- stored as a JSON array of strings
+
+  PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, 
db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table 
is not created.
+CREATE TABLE IF NOT EXISTS projection_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  current_offset VARCHAR(255) NOT NULL,
+  manifest VARCHAR(32) NOT NULL,
+  mergeable BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  slice INT NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  -- timestamp_offset is the db_timestamp of the original event
+  timestamp_offset TIMESTAMP(6) NOT NULL,
+  -- timestamp_consumed is when the offset was stored
+  -- the consumer lag is timestamp_consumed - timestamp_offset
+  timestamp_consumed TIMESTAMP(6) NOT NULL,
+  PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS projection_management (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  paused BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
diff --git 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index 5aa1c50..b8b2015 100644
--- 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++ 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -32,7 +32,8 @@ import pekko.actor.typed.scaladsl.LoggerOps
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.QuerySettings
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.persistence.typed.scaladsl.Effect
@@ -144,6 +145,7 @@ class EventSourcedEndToEndSpec
   private val querySettings = 
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
   private val projectionSettings = R2dbcProjectionSettings(system)
   private val stringSerializer = 
SerializationExtension(system).serializerFor(classOf[String])
+  private implicit val dialect: Dialect = projectionSettings.dialect
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
diff --git 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala
 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala
new file mode 100644
index 0000000..73f1f1e
--- /dev/null
+++ 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/MySQLR2dbcOffsetStoreSqlSpec.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.projection.r2dbc
+
+import org.apache.pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore
+import org.scalatest.TestSuite
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+/**
+ * Unit tests for the SQL strings generated by [[MySQLR2dbcOffsetStore]].
+ *
+ * These tests run without a database and are intended to catch regressions 
such as the
+ * `NOT CONCAT(...) IN` vs `CONCAT(...) NOT IN` operator-placement bug fixed in
+ * https://github.com/apache/pekko-projection/pull/495.
+ */
+class MySQLR2dbcOffsetStoreSqlSpec extends AnyWordSpec with TestSuite with 
Matchers {
+
+  private val table = "projection.timestamp_offset"
+
+  "MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql" should {
+
+    "produce a simple DELETE when there are no exclusions" in {
+      val sql = MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table, 
0)
+      sql should include("DELETE FROM")
+      sql should include(table)
+      sql should include("timestamp_offset < ?")
+      (sql should not).include("NOT IN")
+      (sql should not).include("CONCAT")
+    }
+
+    "produce a DELETE with CONCAT…NOT IN for a single exclusion" in {
+      val sql = MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table, 
1)
+      sql should include("DELETE FROM")
+      sql should include(table)
+      sql should include("timestamp_offset < ?")
+      // Operator must be `CONCAT(…) NOT IN`, not `NOT CONCAT(…) IN`
+      sql should include("CONCAT(persistence_id, '-', seq_nr) NOT IN")
+      (sql should not).include("NOT CONCAT")
+      sql should endWith("NOT IN (?)")
+    }
+
+    "produce a DELETE with CONCAT…NOT IN for multiple exclusions" in {
+      val sql = MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table, 
3)
+      sql should include("DELETE FROM")
+      sql should include(table)
+      sql should include("timestamp_offset < ?")
+      sql should include("CONCAT(persistence_id, '-', seq_nr) NOT IN")
+      (sql should not).include("NOT CONCAT")
+      sql should endWith("NOT IN (?, ?, ?)")
+    }
+
+    "include the correct number of placeholders" in {
+      for (n <- 1 to 5) {
+        val sql = 
MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(table, n)
+        val expectedPlaceholders = Seq.fill(n)("?").mkString(", ")
+        sql should include(s"NOT IN ($expectedPlaceholders)")
+      }
+    }
+
+    "include the table name in the generated SQL" in {
+      val customTable = "my_schema.my_projection_offset"
+      val sql = 
MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(customTable, 2)
+      sql should startWith(s"DELETE FROM $customTable")
+    }
+  }
+}
diff --git 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index 0276314..0fab868 100644
--- 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++ 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -22,7 +22,8 @@ import 
pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
 import pekko.persistence.query.Sequence
 import pekko.persistence.query.TimeBasedUUID
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.projection.MergeableOffset
 import pekko.projection.ProjectionId
 import pekko.projection.internal.ManagementState
@@ -43,6 +44,7 @@ class R2dbcOffsetStoreSpec
   private val clock = TestClock.nowMillis()
 
   private val settings = R2dbcProjectionSettings(testKit.system)
+  private implicit val dialect: Dialect = settings.dialect
 
   private def createOffsetStore(projectionId: ProjectionId) =
     R2dbcOffsetStore.fromConfig(projectionId, None, system, settings, 
r2dbcExecutor, clock)
diff --git 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
index 2aa118d..44e663f 100644
--- 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
+++ 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
@@ -33,7 +33,7 @@ import 
pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorRef
 import pekko.actor.typed.ActorSystem
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.projection.HandlerRecoveryStrategy
 import pekko.projection.OffsetVerification
@@ -97,12 +97,17 @@ object R2dbcProjectionSpec {
   object TestRepository {
     val table = "projection_spec_model"
 
-    val createTableSql: String =
-      s"""|CREATE table IF NOT EXISTS "$table" (
+    def createTableSql(dialect: Dialect): String = {
+      val quotedTable = dialect match {
+        case Dialect.MySQL => table
+        case _             => s""""$table""""
+      }
+      s"""|CREATE table IF NOT EXISTS $quotedTable (
           |  id VARCHAR(255) NOT NULL,
           |  concatenated VARCHAR(255) NOT NULL,
           |  PRIMARY KEY(id)
           |);""".stripMargin
+    }
   }
 
   final case class TestRepository(session: R2dbcSession, settings: 
R2dbcProjectionSettings)(
@@ -110,6 +115,7 @@ object R2dbcProjectionSpec {
     import TestRepository.table
 
     private val logger = LoggerFactory.getLogger(this.getClass)
+    private implicit val dialect: Dialect = settings.dialect
 
     def concatToText(id: String, payload: String): Future[Done] = {
       val savedStrOpt = findById(id)
@@ -200,7 +206,7 @@ class R2dbcProjectionSpec
     super.beforeAll()
 
     Await.result(r2dbcExecutor.executeDdl("beforeAll createTable") { conn =>
-        conn.createStatement(TestRepository.createTableSql)
+        conn.createStatement(TestRepository.createTableSql(settings.dialect))
       }, 10.seconds)
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete 
from ${TestRepository.table}")),
diff --git 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 0451967..349a726 100644
--- 
a/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++ 
b/r2dbc-int-test/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -162,7 +162,7 @@ class R2dbcTimestampOffsetProjectionSpec
     super.beforeAll()
 
     Await.result(r2dbcExecutor.executeDdl("beforeAll createTable") { conn =>
-        conn.createStatement(TestRepository.createTableSql)
+        conn.createStatement(TestRepository.createTableSql(settings.dialect))
       }, 10.seconds)
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete 
from ${TestRepository.table}")),
diff --git 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 7f7c2e5..e7f14a0 100644
--- 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++ 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -230,7 +230,7 @@ private[projection] class R2dbcOffsetStore(
   protected lazy val timestampSql: String = "transaction_timestamp()"
 
   // FIXME include projectionId in all log messages
-  private val logger = LoggerFactory.getLogger(this.getClass)
+  protected val logger = LoggerFactory.getLogger(this.getClass)
 
   private val persistenceExt = Persistence(system)
 
@@ -240,7 +240,7 @@ private[projection] class R2dbcOffsetStore(
   import offsetSerialization.fromStorageRepresentation
   import offsetSerialization.toStorageRepresentation
 
-  private val timestampOffsetTable = settings.timestampOffsetTableWithSchema
+  protected val timestampOffsetTable: String = 
settings.timestampOffsetTableWithSchema
   protected val offsetTable: String = settings.offsetTableWithSchema
   protected val managementTable: String = settings.managementTableWithSchema
 
@@ -250,7 +250,7 @@ private[projection] class R2dbcOffsetStore(
     SELECT slice, persistence_id, seq_nr, timestamp_offset
     FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name 
= ?"""
 
-  private val insertTimestampOffsetSql: String = sql"""
+  protected val insertTimestampOffsetSql: String = sql"""
     INSERT INTO $timestampOffsetTable
     (projection_name, projection_key, slice, persistence_id, seq_nr, 
timestamp_offset, timestamp_consumed)
     VALUES (?,?,?,?,?,?, $timestampSql)"""
@@ -559,25 +559,25 @@ private[projection] class R2dbcOffsetStore(
     }
   }
 
-  private def insertTimestampOffsetInTx(conn: Connection, records: 
immutable.IndexedSeq[Record]): Future[Long] = {
-    def bindRecord(stmt: Statement, record: Record): Statement = {
-      val slice = persistenceExt.sliceForPersistenceId(record.pid)
-      val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
-      val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
-      if (slice < minSlice || slice > maxSlice)
-        throw new IllegalArgumentException(
-          s"This offset store [$projectionId] manages slices " +
-          s"[$minSlice - $maxSlice] but received slice [$slice] for 
persistenceId [${record.pid}]")
-
-      stmt
-        .bind(0, projectionId.name)
-        .bind(1, projectionId.key)
-        .bind(2, slice)
-        .bind(3, record.pid)
-        .bind(4, record.seqNr)
-        .bind(5, record.timestamp)
-    }
+  protected def bindTimestampOffsetRecord(stmt: Statement, record: Record): 
Statement = {
+    val slice = persistenceExt.sliceForPersistenceId(record.pid)
+    val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
+    val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
+    if (slice < minSlice || slice > maxSlice)
+      throw new IllegalArgumentException(
+        s"This offset store [$projectionId] manages slices " +
+        s"[$minSlice - $maxSlice] but received slice [$slice] for 
persistenceId [${record.pid}]")
+
+    stmt
+      .bind(0, projectionId.name)
+      .bind(1, projectionId.key)
+      .bind(2, slice)
+      .bind(3, record.pid)
+      .bind(4, record.seqNr)
+      .bind(5, record.timestamp)
+  }
 
+  protected def insertTimestampOffsetInTx(conn: Connection, records: 
immutable.IndexedSeq[Record]): Future[Long] = {
     require(records.nonEmpty)
 
     logger.trace2("saving timestamp offset [{}], {}", records.last.timestamp, 
records)
@@ -585,14 +585,14 @@ private[projection] class R2dbcOffsetStore(
     val statement = conn.createStatement(insertTimestampOffsetSql)
 
     if (records.size == 1) {
-      val boundStatement = bindRecord(statement, records.head)
+      val boundStatement = bindTimestampOffsetRecord(statement, records.head)
       R2dbcExecutor.updateOneInTx(boundStatement)
     } else {
       // TODO Try Batch without bind parameters for better performance. Risk 
of sql injection for these parameters is low.
       val boundStatement =
         records.foldLeft(statement) { (stmt, rec) =>
           stmt.add()
-          bindRecord(stmt, rec)
+          bindTimestampOffsetRecord(stmt, rec)
         }
       R2dbcExecutor.updateBatchInTx(boundStatement)
     }
@@ -885,15 +885,7 @@ private[projection] class R2dbcOffsetStore(
             s"${record.pid}-${record.seqNr}"
         }.toArray
 
-        val result = r2dbcExecutor.updateOne("delete old timestamp offset") { 
conn =>
-          conn
-            .createStatement(deleteOldTimestampOffsetSql)
-            .bind(0, minSlice)
-            .bind(1, maxSlice)
-            .bind(2, projectionId.name)
-            .bind(3, until)
-            .bind(4, notInLatestBySlice)
-        }
+        val result = executeDeleteOldTimestampOffsets(minSlice, maxSlice, 
until, notInLatestBySlice)
 
         result.failed.foreach { exc =>
           idle.set(false) // try again next tick
@@ -917,6 +909,22 @@ private[projection] class R2dbcOffsetStore(
     }
   }
 
+  protected def executeDeleteOldTimestampOffsets(
+      minSlice: Int,
+      maxSlice: Int,
+      until: Instant,
+      notInLatestBySlice: Array[String]): Future[Long] = {
+    r2dbcExecutor.updateOne("delete old timestamp offset") { conn =>
+      conn
+        .createStatement(deleteOldTimestampOffsetSql)
+        .bind(0, minSlice)
+        .bind(1, maxSlice)
+        .bind(2, projectionId.name)
+        .bind(3, until)
+        .bind(4, notInLatestBySlice)
+    }
+  }
+
   /**
    * Resetting an offset. Deletes newer offsets. Used from 
ProjectionManagement. Doesn't update in-memory state because
    * the projection is supposed to be stopped/started for this operation.
@@ -1057,9 +1065,9 @@ private[projection] class R2dbcOffsetStore(
           .bind(3, Instant.now(clock).toEpochMilli)
       }
       .flatMap {
-        case i if i == 1 => Future.successful(Done)
-        case _           =>
-          Future.failed(new RuntimeException(s"Failed to update management 
table for $projectionId"))
+        case i if i >= 1 => Future.successful(Done)
+        case i           =>
+          Future.failed(new RuntimeException(s"Failed to update management 
table for $projectionId, row count: $i"))
       }
   }
 
diff --git 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
index fe988c0..af70295 100644
--- 
a/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
+++ 
b/r2dbc/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala
@@ -20,6 +20,10 @@
 package org.apache.pekko.projection.r2dbc.internal.mysql
 
 import java.time.Clock
+import java.time.Instant
+
+import scala.collection.immutable
+import scala.concurrent.Future
 
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
@@ -30,6 +34,29 @@ import pekko.projection.BySlicesSourceProvider
 import pekko.projection.ProjectionId
 import pekko.projection.r2dbc.R2dbcProjectionSettings
 import pekko.projection.r2dbc.internal.R2dbcOffsetStore
+import io.r2dbc.spi.Connection
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[projection] object MySQLR2dbcOffsetStore {
+
+  /**
+   * Builds the SQL string for deleting old timestamp offsets. When 
`notInCount` is 0 the plain
+   * deletion query (no exclusion list) is returned. When `notInCount > 0` the 
query uses
+   * `CONCAT(persistence_id, '-', seq_nr) NOT IN (?, …)` — note the placement 
of `NOT IN` rather
+   * than `NOT CONCAT(…) IN`, which is invalid SQL.
+   */
+  private[projection] def buildDeleteOldTimestampOffsetsSql(tableName: String, 
notInCount: Int): String = {
+    if (notInCount == 0) {
+      s"DELETE FROM $tableName WHERE slice BETWEEN ? AND ? AND projection_name 
= ? AND timestamp_offset < ?"
+    } else {
+      val placeholders = Seq.fill(notInCount)("?").mkString(", ")
+      s"DELETE FROM $tableName WHERE slice BETWEEN ? AND ? AND projection_name 
= ? AND timestamp_offset < ? AND CONCAT(persistence_id, '-', seq_nr) NOT IN 
($placeholders)"
+    }
+  }
+}
 
 /**
  * INTERNAL API
@@ -63,4 +90,70 @@ private[projection] class MySQLR2dbcOffsetStore(
     ON DUPLICATE KEY UPDATE
     paused = excluded.paused,
     last_updated = excluded.last_updated"""
+
+  /**
+   * MySQL's r2dbc driver validates that all parameters are bound before 
`add()` is called
+   * on a batch statement, unlike PostgreSQL's driver. We therefore bind the 
first record
+   * before folding over the remaining records with `add()`.
+   */
+  override protected def insertTimestampOffsetInTx(
+      conn: Connection,
+      records: immutable.IndexedSeq[R2dbcOffsetStore.Record]): Future[Long] = {
+    require(records.nonEmpty)
+
+    logger.trace("saving timestamp offset [{}], {}", records.last.timestamp, 
records)
+
+    val statement = conn.createStatement(insertTimestampOffsetSql)
+
+    if (records.size == 1) {
+      val boundStatement = bindTimestampOffsetRecord(statement, records.head)
+      R2dbcExecutor.updateOneInTx(boundStatement)
+    } else {
+      // Bind the first record before calling add() for the rest; MySQL 
validates all parameters
+      // are bound on the current batch row before accepting add().
+      val boundStatement =
+        records.tail.foldLeft(bindTimestampOffsetRecord(statement, 
records.head)) { (stmt, rec) =>
+          stmt.add()
+          bindTimestampOffsetRecord(stmt, rec)
+        }
+      R2dbcExecutor.updateBatchInTx(boundStatement)
+    }
+  }
+
+  /**
+   * MySQL does not support `= ANY (?)` with an array parameter or the `||` 
string concatenation
+   * operator. This override builds the DELETE SQL dynamically using 
`CONCAT()` and
+   * `NOT IN (?, ?, ...)` with one placeholder per exclusion entry.
+   */
+  override protected def executeDeleteOldTimestampOffsets(
+      minSlice: Int,
+      maxSlice: Int,
+      until: Instant,
+      notInLatestBySlice: Array[String]): Future[Long] = {
+    r2dbcExecutor.updateOne("delete old timestamp offset") { conn =>
+      val stmt = if (notInLatestBySlice.isEmpty) {
+        conn
+          .createStatement(
+            
MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(timestampOffsetTable, 
0))
+          .bind(0, minSlice)
+          .bind(1, maxSlice)
+          .bind(2, projectionId.name)
+          .bind(3, until)
+      } else {
+        val s = conn
+          .createStatement(
+            MySQLR2dbcOffsetStore.buildDeleteOldTimestampOffsetsSql(
+              timestampOffsetTable,
+              notInLatestBySlice.length))
+          .bind(0, minSlice)
+          .bind(1, maxSlice)
+          .bind(2, projectionId.name)
+          .bind(3, until)
+        notInLatestBySlice.zipWithIndex.foldLeft(s) { case (st, (value, idx)) 
=>
+          st.bind(4 + idx, value)
+        }
+      }
+      stmt
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to