This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 5f50c809 test durable state store tck (#482)
5f50c809 is described below
commit 5f50c809d9923246c480524969998ead4217c51a
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 16 16:26:14 2026 +0100
test durable state store tck (#482)
* Copy DurableStateStore TCK base classes from apache/pekko#2833 and add
integration tests for all supported databases
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/c3116ae6-042e-496b-b5bc-f480e95f7a01
Co-authored-by: pjfanning <[email protected]>
* Fix: remove sealed CapabilityFlags inheritance from
DurableStateStoreCapabilityFlags
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/dcab8f13-0bc4-403e-afbd-d60bdaa2b03a
Co-authored-by: pjfanning <[email protected]>
* Fix Oracle TCK failures: use non-empty tag in DurableStateStoreSpec tests
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/491c7f9d-e7f5-446b-8cac-cc96c7c17767
Co-authored-by: pjfanning <[email protected]>
* Bring over latest TCK changes from apache/pekko#2917
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/b8ecac9c-455d-49ad-b6c1-7397780899da
Co-authored-by: pjfanning <[email protected]>
* use tck files from 2.0.0-M2 release
* Fix deleteObject revision semantics to match pekko 2.0.0-M2 TCK
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-jdbc/sessions/146149b4-03f1-4102-b987-773bc5eda38e
Co-authored-by: pjfanning <[email protected]>
* remove use of DeleteRevisionException
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../jdbc/state/DurableStateQueries.scala | 8 ++--
.../scaladsl/DurableStateExceptionSupport.scala | 48 -------------------
.../state/scaladsl/JdbcDurableStateStore.scala | 25 +++++++---
.../jdbc/state/scaladsl/JdbcDurableStateSpec.scala | 28 +++++++++--
.../scaladsl/JdbcDurableStateStoreTCKSpec.scala | 54 ++++++++++++++++++++++
.../integration/DurableStateStoreTCKSpec.scala | 52 +++++++++++++++++++++
project/PekkoCoreDependency.scala | 2 +-
7 files changed, 156 insertions(+), 61 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
index b4346100..86bfabad 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala
@@ -115,13 +115,15 @@ import slick.jdbc.{
}
/**
- * Deletes a particular revision of an object based on its persistenceId.
- * This revision may no longer exist and if so, no delete will occur.
+ * Deletes the row for a given persistenceId whose revision equals `revision
- 1`.
+ * Following the same "next-revision" convention as `upsertObject`, the
caller passes
+ * the tombstone revision (= current stored revision + 1), so we delete the
row
+ * where `storedRevision = passedRevision - 1`.
*
* @since 1.1.0
*/
private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId:
String, revision: Long) = {
- selectFromDbByPersistenceId(persistenceId).filter(_.revision ===
revision).delete
+ selectFromDbByPersistenceId(persistenceId).filter(_.revision === revision
- 1L).delete
}
def deleteAllFromDb() = {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
deleted file mode 100644
index 88e19316..00000000
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pekko.persistence.jdbc.state.scaladsl
-
-import java.lang.invoke.{ MethodHandles, MethodType }
-
-import scala.util.Try
-
-/**
- * INTERNAL API
- *
- * Support for creating a `DeleteRevisionException`if the class is
- * available on the classpath. Pekko 1.0 does not have this class, but
- * it is added in Pekko 1.1.
- */
-private[scaladsl] object DurableStateExceptionSupport {
- val DeleteRevisionExceptionClass =
- "org.apache.pekko.persistence.state.exception.DeleteRevisionException"
-
- private def exceptionClassOpt: Option[Class[_]] =
- Try(Class.forName(DeleteRevisionExceptionClass)).toOption
-
- private val constructorOpt = exceptionClassOpt.map { clz =>
- val mt = MethodType.methodType(classOf[Unit], classOf[String])
- MethodHandles.publicLookup().findConstructor(clz, mt)
- }
-
- def createDeleteRevisionExceptionIfSupported(message: String):
Option[Exception] =
- constructorOpt.map { constructor =>
- constructor.invoke(message).asInstanceOf[Exception]
- }
-
-}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
index 629a5751..c534a8f2 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala
@@ -101,7 +101,24 @@ class JdbcDurableStateStore[A](
Future
.fromTry(row)
.flatMap { r =>
- val action = if (revision == 1) insertDurableState(r) else
updateDurableState(r)
+ val action = if (revision == 1) insertDurableState(r)
+ else {
+ // Try UPDATE first (entity exists at revision - 1). If 0 rows are
affected, the
+ // entity may have been deleted (no row at all) rather than having a
stale revision.
+ // In that case fall back to INSERT so that re-creation after
deletion works.
+ for {
+ s <- queries.getSequenceNextValueExpr()
+ u <- queries.updateDbWithDurableState(r, s.head)
+ result <-
+ if (u > 0) DBIO.successful(u)
+ else {
+
queries.selectFromDbByPersistenceId(persistenceId).exists.result.flatMap {
exists =>
+ if (!exists) queries.insertDbWithDurableState(r, s.head)
+ else DBIO.successful(0) // entity exists with wrong revision
-> propagate 0 to throw below
+ }
+ }
+ } yield result
+ }
db.run(action)
}
.map { rowsAffected =>
@@ -118,16 +135,12 @@ class JdbcDurableStateStore[A](
override def deleteObject(persistenceId: String, revision: Long):
Future[Done] =
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId,
revision)).map { count =>
if (count != 1) {
- // if you run this code with Pekko 1.0.x, no exception will be thrown
here
- // this matches the behavior of pekko-connectors-jdbc 1.0.x
- // if you run this code with Pekko 1.1.x, a DeleteRevisionException
will be thrown here
val msg = if (count == 0) {
s"Failed to delete object with persistenceId [$persistenceId] and
revision [$revision]"
} else {
s"Delete object succeeded for persistenceId [$persistenceId] and
revision [$revision] but more than one row was affected ($count rows)"
}
-
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
- .foreach(throw _)
+ throw new IllegalStateException(msg)
}
Done
}(ExecutionContext.parasitic)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
index 5f386321..dce1df79 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala
@@ -128,7 +128,7 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
}
}
}
- "fail to delete old object revision" in {
+ "fail to delete with old object revision" in {
val f = for {
n <- stateStoreString.upsertObject("p987", 1, "a valid string", "t123")
_ = n shouldBe pekko.Done
@@ -144,7 +144,7 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
}
} else {
whenReady(f.failed) { e =>
- e.getClass.getName shouldEqual
DurableStateExceptionSupport.DeleteRevisionExceptionClass
+ e.getClass shouldEqual classOf[IllegalStateException]
e.getMessage should include("Failed to delete object with
persistenceId [p987] and revision [1]")
}
}
@@ -159,7 +159,9 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
_ = g.value shouldBe Some("a valid string")
u <- stateStoreString.upsertObject("p9876", 2, "updated valid
string", "t123")
_ = u shouldBe pekko.Done
- d <- stateStoreString.deleteObject("p9876", 2)
+ // revision 3 = current stored revision (2) + 1, following the same
tombstone-revision
+ // convention used by upsertObject and the pekko-persistence TCK
+ d <- stateStoreString.deleteObject("p9876", 3)
_ = d shouldBe pekko.Done
h <- stateStoreString.getObject("p9876")
@@ -169,6 +171,26 @@ abstract class JdbcDurableStateSpec(config: Config,
schemaType: SchemaType) exte
v.value shouldBe None
}
}
+ "upsert again after deletion" in {
+ whenReady {
+ for {
+ n <- stateStoreString.upsertObject("p11111", 1, "original string",
"t123")
+ _ = n shouldBe pekko.Done
+ // tombstone revision = current stored revision (1) + 1 = 2
+ d <- stateStoreString.deleteObject("p11111", 2)
+ _ = d shouldBe pekko.Done
+ g <- stateStoreString.getObject("p11111")
+ _ = g.value shouldBe None
+ // re-insert: revision = tombstone revision (2) + 1 = 3
+ u <- stateStoreString.upsertObject("p11111", 3, "recreated string",
"t123")
+ _ = u shouldBe pekko.Done
+ h <- stateStoreString.getObject("p11111")
+ } yield h
+ } { v =>
+ v.value shouldBe Some("recreated string")
+ v.revision shouldBe 3L
+ }
+ }
}
"A durable state store with payload that needs custom serializer" must
withActorSystem { implicit system =>
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreTCKSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreTCKSpec.scala
new file mode 100644
index 00000000..6ce9da6f
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStoreTCKSpec.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.persistence.jdbc.state.scaladsl
+
+import com.typesafe.config.Config
+import org.apache.pekko
+import pekko.persistence.CapabilityFlag
+import pekko.persistence.jdbc.config.SlickConfiguration
+import pekko.persistence.jdbc.db.SlickDatabase
+import pekko.persistence.jdbc.testkit.internal.SchemaType
+import pekko.persistence.jdbc.util.DropCreate
+import pekko.persistence.state.DurableStateStoreSpec
+import org.scalatest.BeforeAndAfterAll
+
+abstract class JdbcDurableStateStoreTCKSpec(config: Config, schemaType:
SchemaType)
+ extends DurableStateStoreSpec(config)
+ with BeforeAndAfterAll
+ with DropCreate {
+
+ override protected def supportsDeleteWithRevisionCheck: CapabilityFlag =
CapabilityFlag.on()
+
+ override protected def supportsUpsertWithRevisionCheck: CapabilityFlag =
CapabilityFlag.on()
+
+ override protected def supportsSoftDelete: CapabilityFlag =
CapabilityFlag.on()
+
+ lazy val db = {
+ val cfg = config.getConfig("jdbc-durable-state-store")
+ if (cfg.hasPath("slick.profile")) {
+ SlickDatabase.database(cfg, new
SlickConfiguration(cfg.getConfig("slick")), "slick.db")
+ } else {
+ SlickDatabase.database(
+ config,
+ new
SlickConfiguration(config.getConfig("pekko-persistence-jdbc.shared-databases.slick")),
+ "pekko-persistence-jdbc.shared-databases.slick.db")
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ dropAndCreate(schemaType)
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ db.close()
+ }
+}
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/DurableStateStoreTCKSpec.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/DurableStateStoreTCKSpec.scala
new file mode 100644
index 00000000..e113e634
--- /dev/null
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/DurableStateStoreTCKSpec.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import com.typesafe.config.ConfigFactory
+import
org.apache.pekko.persistence.jdbc.state.scaladsl.JdbcDurableStateStoreTCKSpec
+import org.apache.pekko.persistence.jdbc.testkit.internal.{ MariaDB, MySQL,
Oracle, Postgres, SqlServer }
+
+// postgres-application.conf already includes pekko.persistence.state.plugin =
"jdbc-durable-state-store"
+class PostgresDurableStateStoreTCKSpec
+ extends
JdbcDurableStateStoreTCKSpec(ConfigFactory.load("postgres-application.conf"),
Postgres)
+
+// mariadb-application.conf already includes pekko.persistence.state.plugin =
"jdbc-durable-state-store"
+class MariaDBDurableStateStoreTCKSpec
+ extends
JdbcDurableStateStoreTCKSpec(ConfigFactory.load("mariadb-application.conf"),
MariaDB)
+
+object MySQLDurableStateStoreTCKSpec {
+ // mysql-application.conf does not configure the durable state plugin, so
add it here
+ val config = ConfigFactory
+ .parseString("""pekko.persistence.state.plugin =
"jdbc-durable-state-store"""")
+ .withFallback(ConfigFactory.load("mysql-application.conf"))
+}
+
+class MySQLDurableStateStoreTCKSpec
+ extends JdbcDurableStateStoreTCKSpec(MySQLDurableStateStoreTCKSpec.config,
MySQL)
+
+object OracleDurableStateStoreTCKSpec {
+ // oracle-application.conf does not configure the durable state plugin, so
add it here
+ val config = ConfigFactory
+ .parseString("""pekko.persistence.state.plugin =
"jdbc-durable-state-store"""")
+ .withFallback(ConfigFactory.load("oracle-application.conf"))
+}
+
+class OracleDurableStateStoreTCKSpec
+ extends
JdbcDurableStateStoreTCKSpec(OracleDurableStateStoreTCKSpec.config, Oracle)
+
+object SqlServerDurableStateStoreTCKSpec {
+ // sqlserver-application.conf does not configure the durable state plugin,
so add it here
+ val config = ConfigFactory
+ .parseString("""pekko.persistence.state.plugin =
"jdbc-durable-state-store"""")
+ .withFallback(ConfigFactory.load("sqlserver-application.conf"))
+}
+
+class SqlServerDurableStateStoreTCKSpec
+ extends
JdbcDurableStateStoreTCKSpec(SqlServerDurableStateStoreTCKSpec.config,
SqlServer)
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index c127cb61..37903c6d 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "2.0.0-M1"
+ override val currentVersion: String = "2.0.0-M2"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]