This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f50c809 test durable state store tck (#482)
5f50c809 is described below

commit 5f50c809d9923246c480524969998ead4217c51a
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 16 16:26:14 2026 +0100

    test durable state store tck (#482)
    
    * Copy DurableStateStore TCK base classes from apache/pekko#2833 and add 
integration tests for all supported databases
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/c3116ae6-042e-496b-b5bc-f480e95f7a01
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix: remove sealed CapabilityFlags inheritance from 
DurableStateStoreCapabilityFlags
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/dcab8f13-0bc4-403e-afbd-d60bdaa2b03a
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix Oracle TCK failures: use non-empty tag in DurableStateStoreSpec tests
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/491c7f9d-e7f5-446b-8cac-cc96c7c17767
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Bring over latest TCK changes from apache/pekko#2917
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/b8ecac9c-455d-49ad-b6c1-7397780899da
    
    Co-authored-by: pjfanning <[email protected]>
    
    * use tck files from 2.0.0-M2 release
    
    * Fix deleteObject revision semantics to match pekko 2.0.0-M2 TCK
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/146149b4-03f1-4102-b987-773bc5eda38e
    
    Co-authored-by: pjfanning <[email protected]>
    
    * remove use of DeleteRevisionException
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../jdbc/state/DurableStateQueries.scala           |  8 ++--
 .../scaladsl/DurableStateExceptionSupport.scala    | 48 -------------------
 .../state/scaladsl/JdbcDurableStateStore.scala     | 25 +++++++---
 .../jdbc/state/scaladsl/JdbcDurableStateSpec.scala | 28 +++++++++--
 .../scaladsl/JdbcDurableStateStoreTCKSpec.scala    | 54 ++++++++++++++++++++++
 .../integration/DurableStateStoreTCKSpec.scala     | 52 +++++++++++++++++++++
 project/PekkoCoreDependency.scala                  |  2 +-
 7 files changed, 156 insertions(+), 61 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
index b4346100..86bfabad 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
@@ -115,13 +115,15 @@ import slick.jdbc.{
   }
 
   /**
-   * Deletes a particular revision of an object based on its persistenceId.
-   * This revision may no longer exist and if so, no delete will occur.
+   * Deletes the row for a given persistenceId whose revision equals `revision 
- 1`.
+   * Following the same "next-revision" convention as `upsertObject`, the 
caller passes
+   * the tombstone revision (= current stored revision + 1), so we delete the 
row
+   * where `storedRevision = passedRevision - 1`.
    *
    * @since 1.1.0
    */
   private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId: 
String, revision: Long) = {
-    selectFromDbByPersistenceId(persistenceId).filter(_.revision === 
revision).delete
+    selectFromDbByPersistenceId(persistenceId).filter(_.revision === revision 
- 1L).delete
   }
 
   def deleteAllFromDb() = {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
deleted file mode 100644
index 88e19316..00000000
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pekko.persistence.jdbc.state.scaladsl
-
-import java.lang.invoke.{ MethodHandles, MethodType }
-
-import scala.util.Try
-
-/**
- * INTERNAL API
- *
- * Support for creating a `DeleteRevisionException`if the class is
- * available on the classpath. Pekko 1.0 does not have this class, but
- * it is added in Pekko 1.1.
- */
-private[scaladsl] object DurableStateExceptionSupport {
-  val DeleteRevisionExceptionClass =
-    "org.apache.pekko.persistence.state.exception.DeleteRevisionException"
-
-  private def exceptionClassOpt: Option[Class[_]] =
-    Try(Class.forName(DeleteRevisionExceptionClass)).toOption
-
-  private val constructorOpt = exceptionClassOpt.map { clz =>
-    val mt = MethodType.methodType(classOf[Unit], classOf[String])
-    MethodHandles.publicLookup().findConstructor(clz, mt)
-  }
-
-  def createDeleteRevisionExceptionIfSupported(message: String): 
Option[Exception] =
-    constructorOpt.map { constructor =>
-      constructor.invoke(message).asInstanceOf[Exception]
-    }
-
-}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
index 629a5751..c534a8f2 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
@@ -101,7 +101,24 @@ class JdbcDurableStateStore[A](
     Future
       .fromTry(row)
       .flatMap { r =>
-        val action = if (revision == 1) insertDurableState(r) else 
updateDurableState(r)
+        val action = if (revision == 1) insertDurableState(r)
+        else {
+          // Try UPDATE first (entity exists at revision - 1). If 0 rows are 
affected, the
+          // entity may have been deleted (no row at all) rather than having a 
stale revision.
+          // In that case fall back to INSERT so that re-creation after 
deletion works.
+          for {
+            s <- queries.getSequenceNextValueExpr()
+            u <- queries.updateDbWithDurableState(r, s.head)
+            result <-
+              if (u > 0) DBIO.successful(u)
+              else {
+                
queries.selectFromDbByPersistenceId(persistenceId).exists.result.flatMap { 
exists =>
+                  if (!exists) queries.insertDbWithDurableState(r, s.head)
+                  else DBIO.successful(0) // entity exists with wrong revision 
-> propagate 0 to throw below
+                }
+              }
+          } yield result
+        }
         db.run(action)
       }
       .map { rowsAffected =>
@@ -118,16 +135,12 @@ class JdbcDurableStateStore[A](
   override def deleteObject(persistenceId: String, revision: Long): 
Future[Done] =
     db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, 
revision)).map { count =>
       if (count != 1) {
-        // if you run this code with Pekko 1.0.x, no exception will be thrown 
here
-        // this matches the behavior of pekko-connectors-jdbc 1.0.x
-        // if you run this code with Pekko 1.1.x, a DeleteRevisionException 
will be thrown here
         val msg = if (count == 0) {
           s"Failed to delete object with persistenceId [$persistenceId] and 
revision [$revision]"
         } else {
           s"Delete object succeeded for persistenceId [$persistenceId] and 
revision [$revision] but more than one row was affected ($count rows)"
         }
-        
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
-          .foreach(throw _)
+        throw new IllegalStateException(msg)
       }
       Done
     }(ExecutionContext.parasitic)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
index 5f386321..dce1df79 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
@@ -128,7 +128,7 @@ abstract class JdbcDurableStateSpec(config: Config, 
schemaType: SchemaType) exte
         }
       }
     }
-    "fail to delete old object revision" in {
+    "fail to delete with old object revision" in {
       val f = for {
         n <- stateStoreString.upsertObject("p987", 1, "a valid string", "t123")
         _ = n shouldBe pekko.Done
@@ -144,7 +144,7 @@ abstract class JdbcDurableStateSpec(config: Config, 
schemaType: SchemaType) exte
         }
       } else {
         whenReady(f.failed) { e =>
-          e.getClass.getName shouldEqual 
DurableStateExceptionSupport.DeleteRevisionExceptionClass
+          e.getClass shouldEqual classOf[IllegalStateException]
           e.getMessage should include("Failed to delete object with 
persistenceId [p987] and revision [1]")
         }
       }
@@ -159,7 +159,9 @@ abstract class JdbcDurableStateSpec(config: Config, 
schemaType: SchemaType) exte
           _ = g.value shouldBe Some("a valid string")
           u <- stateStoreString.upsertObject("p9876", 2, "updated valid 
string", "t123")
           _ = u shouldBe pekko.Done
-          d <- stateStoreString.deleteObject("p9876", 2)
+          // revision 3 = current stored revision (2) + 1, following the same 
tombstone-revision
+          // convention used by upsertObject and the pekko-persistence TCK
+          d <- stateStoreString.deleteObject("p9876", 3)
           _ = d shouldBe pekko.Done
           h <- stateStoreString.getObject("p9876")
 
@@ -169,6 +171,26 @@ abstract class JdbcDurableStateSpec(config: Config, 
schemaType: SchemaType) exte
         v.value shouldBe None
       }
     }
+    "upsert again after deletion" in {
+      whenReady {
+        for {
+          n <- stateStoreString.upsertObject("p11111", 1, "original string", 
"t123")
+          _ = n shouldBe pekko.Done
+          // tombstone revision = current stored revision (1) + 1 = 2
+          d <- stateStoreString.deleteObject("p11111", 2)
+          _ = d shouldBe pekko.Done
+          g <- stateStoreString.getObject("p11111")
+          _ = g.value shouldBe None
+          // re-insert: revision = tombstone revision (2) + 1 = 3
+          u <- stateStoreString.upsertObject("p11111", 3, "recreated string", 
"t123")
+          _ = u shouldBe pekko.Done
+          h <- stateStoreString.getObject("p11111")
+        } yield h
+      } { v =>
+        v.value shouldBe Some("recreated string")
+        v.revision shouldBe 3L
+      }
+    }
   }
 
   "A durable state store with payload that needs custom serializer" must 
withActorSystem { implicit system =>
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreTCKSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreTCKSpec.scala
new file mode 100644
index 00000000..6ce9da6f
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreTCKSpec.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.persistence.jdbc.state.scaladsl
+
+import com.typesafe.config.Config
+import org.apache.pekko
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.jdbc.config.SlickConfiguration
+import pekko.persistence.jdbc.db.SlickDatabase
+import pekko.persistence.jdbc.testkit.internal.SchemaType
+import pekko.persistence.jdbc.util.DropCreate
+import pekko.persistence.state.DurableStateStoreSpec
+import org.scalatest.BeforeAndAfterAll
+
+abstract class JdbcDurableStateStoreTCKSpec(config: Config, schemaType: 
SchemaType)
+    extends DurableStateStoreSpec(config)
+    with BeforeAndAfterAll
+    with DropCreate {
+
+  override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.on()
+
+  override protected def supportsUpsertWithRevisionCheck: CapabilityFlag = 
CapabilityFlag.on()
+
+  override protected def supportsSoftDelete: CapabilityFlag = 
CapabilityFlag.on()
+
+  lazy val db = {
+    val cfg = config.getConfig("jdbc-durable-state-store")
+    if (cfg.hasPath("slick.profile")) {
+      SlickDatabase.database(cfg, new 
SlickConfiguration(cfg.getConfig("slick")), "slick.db")
+    } else {
+      SlickDatabase.database(
+        config,
+        new 
SlickConfiguration(config.getConfig("pekko-persistence-jdbc.shared-databases.slick")),
+        "pekko-persistence-jdbc.shared-databases.slick.db")
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    dropAndCreate(schemaType)
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    db.close()
+  }
+}
diff --git 
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/DurableStateStoreTCKSpec.scala
 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/DurableStateStoreTCKSpec.scala
new file mode 100644
index 00000000..e113e634
--- /dev/null
+++ 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/DurableStateStoreTCKSpec.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import com.typesafe.config.ConfigFactory
+import 
org.apache.pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStoreTCKSpec
+import org.apache.pekko.persistence.jdbc.testkit.internal.{ MariaDB, MySQL, 
Oracle, Postgres, SqlServer }
+
+// postgres-application.conf already includes pekko.persistence.state.plugin = 
"jdbc-durable-state-store"
+class PostgresDurableStateStoreTCKSpec
+    extends 
JdbcDurableStateStoreTCKSpec(ConfigFactory.load("postgres-application.conf"), 
Postgres)
+
+// mariadb-application.conf already includes pekko.persistence.state.plugin = 
"jdbc-durable-state-store"
+class MariaDBDurableStateStoreTCKSpec
+    extends 
JdbcDurableStateStoreTCKSpec(ConfigFactory.load("mariadb-application.conf"), 
MariaDB)
+
+object MySQLDurableStateStoreTCKSpec {
+  // mysql-application.conf does not configure the durable state plugin, so 
add it here
+  val config = ConfigFactory
+    .parseString("""pekko.persistence.state.plugin = 
"jdbc-durable-state-store"""")
+    .withFallback(ConfigFactory.load("mysql-application.conf"))
+}
+
+class MySQLDurableStateStoreTCKSpec
+    extends JdbcDurableStateStoreTCKSpec(MySQLDurableStateStoreTCKSpec.config, 
MySQL)
+
+object OracleDurableStateStoreTCKSpec {
+  // oracle-application.conf does not configure the durable state plugin, so 
add it here
+  val config = ConfigFactory
+    .parseString("""pekko.persistence.state.plugin = 
"jdbc-durable-state-store"""")
+    .withFallback(ConfigFactory.load("oracle-application.conf"))
+}
+
+class OracleDurableStateStoreTCKSpec
+    extends 
JdbcDurableStateStoreTCKSpec(OracleDurableStateStoreTCKSpec.config, Oracle)
+
+object SqlServerDurableStateStoreTCKSpec {
+  // sqlserver-application.conf does not configure the durable state plugin, 
so add it here
+  val config = ConfigFactory
+    .parseString("""pekko.persistence.state.plugin = 
"jdbc-durable-state-store"""")
+    .withFallback(ConfigFactory.load("sqlserver-application.conf"))
+}
+
+class SqlServerDurableStateStoreTCKSpec
+    extends 
JdbcDurableStateStoreTCKSpec(SqlServerDurableStateStoreTCKSpec.config, 
SqlServer)
diff --git a/project/PekkoCoreDependency.scala 
b/project/PekkoCoreDependency.scala
index c127cb61..37903c6d 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoCoreDependency extends PekkoDependency {
   override val checkProject: String = "pekko-cluster-sharding-typed"
   override val module: Option[String] = None
-  override val currentVersion: String = "2.0.0-M1"
+  override val currentVersion: String = "2.0.0-M2"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to