This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new e1c2d4d port durable state delete changes (#304)
e1c2d4d is described below
commit e1c2d4df5b59357777f254f8aa9dba19a37efaff
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Mar 23 19:42:48 2026 +0100
port durable state delete changes (#304)
* port durable state delete changes
* Update R2dbcDurableStateStore.scala
* Update DurableStateDao.scala
* Create delete-erffect.excludes
* add tests
* fix 1 test
* Update DurableStateStoreSpec.scala
* remove tests that no longer apply
* Update DurableStateBySliceSpec.scala
* Update DurableStateBySliceSpec.scala
* copyright years
* Fix race condition in emit DeletedDurableState test
Co-authored-by: pjfanning <[email protected]>
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/71ff8fa8-e95a-4390-b012-ee6cbcb1a447
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../delete-erffect.excludes | 25 ++++
.../r2dbc/state/scaladsl/DurableStateDao.scala | 138 +++++++++++++++------
.../state/scaladsl/R2dbcDurableStateStore.scala | 48 +++++--
.../pekko/persistence/r2dbc/TestActors.scala | 5 +
.../r2dbc/state/DurableStateBySliceSpec.scala | 68 +++++++++-
.../r2dbc/state/DurableStateStoreSpec.scala | 78 +++++++++---
docs/src/main/paradox/query.md | 6 +-
.../r2dbc/internal/R2dbcOffsetStore.scala | 13 +-
.../r2dbc/internal/R2dbcProjectionImpl.scala | 11 +-
9 files changed, 317 insertions(+), 75 deletions(-)
diff --git
a/core/src/main/mima-filters/2.0.x.backwards.excludes/delete-erffect.excludes
b/core/src/main/mima-filters/2.0.x.backwards.excludes/delete-erffect.excludes
new file mode 100644
index 0000000..7c370d0
--- /dev/null
+++
b/core/src/main/mima-filters/2.0.x.backwards.excludes/delete-erffect.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/304
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.payload")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow._5")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.copy$default$5")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.copy")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.this")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.DurableStateDao#SerializedStateRow.unapply")
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 415b7fe..96598d4 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.r2dbc.state.scaladsl
@@ -37,6 +37,7 @@ import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
import io.r2dbc.spi.Statement
import org.slf4j.Logger
@@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory
revision: Long,
dbTimestamp: Instant,
readDbTimestamp: Instant,
- payload: Array[Byte],
+ payload: Option[Array[Byte]],
serId: Int,
serManifest: String,
tags: Set[String])
@@ -126,7 +127,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
(slice, entity_type, persistence_id, revision, state_ser_id,
state_ser_manifest, state_payload, tags, db_timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, $transactionTimestampSql)"""
- private val updateStateSql: String = {
+ private def updateStateSql(updateTags: Boolean): String = {
val timestamp =
if (settings.dbTimestampMonotonicIncreasing)
s"$transactionTimestampSql"
@@ -138,14 +139,16 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
if (settings.durableStateAssertSingleWriter) " AND revision = ?"
else ""
+ val tags = if (updateTags) "tags = ?," else ""
+
sql"""
UPDATE $stateTable
- SET revision = ?, state_ser_id = ?, state_ser_manifest = ?,
state_payload = ?, tags = ?, db_timestamp = $timestamp
+ SET revision = ?, state_ser_id = ?, state_ser_manifest = ?,
state_payload = ?, $tags db_timestamp = $timestamp
WHERE persistence_id = ?
$revisionCondition"""
}
- private val deleteStateSql: String =
+ private val hardDeleteStateSql: String =
sql"DELETE from $stateTable WHERE persistence_id = ?"
private val deleteStateWithRevisionSql: String =
@@ -177,7 +180,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
val selectColumns =
if (backtracking)
- "SELECT persistence_id, revision, db_timestamp, statement_timestamp()
AS read_db_timestamp "
+ "SELECT persistence_id, revision, db_timestamp, statement_timestamp()
AS read_db_timestamp, state_ser_id "
else
"SELECT persistence_id, revision, db_timestamp, statement_timestamp()
AS read_db_timestamp, state_ser_id, state_ser_manifest, state_payload "
@@ -203,18 +206,24 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
revision = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = Instant.EPOCH, // not needed here
- payload = row.get("state_payload", classOf[Array[Byte]]),
+ getPayload(row),
serId = row.get[Integer]("state_ser_id", classOf[Integer]),
serManifest = row.get("state_ser_manifest", classOf[String]),
tags = Set.empty // tags not fetched in queries (yet)
))
}
- def writeState(state: SerializedStateRow): Future[Done] = {
- require(state.revision > 0)
+ private def getPayload(row: Row): Option[Array[Byte]] = {
+ val serId = row.get("state_ser_id", classOf[Integer])
+ val rowPayload = row.get("state_payload", classOf[Array[Byte]])
+ if (serId == 0 && (rowPayload == null || rowPayload.isEmpty))
+ None // delete marker
+ else
+ Option(rowPayload)
+ }
- val entityType = PersistenceId.extractEntityType(state.persistenceId)
- val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
+ def upsertState(state: SerializedStateRow): Future[Done] = {
+ require(state.revision > 0)
def bindTags(stmt: Statement, i: Int): Statement = {
if (state.tags.isEmpty)
@@ -225,6 +234,9 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
val result = {
if (state.revision == 1) {
+ val entityType = PersistenceId.extractEntityType(state.persistenceId)
+ val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
+
r2dbcExecutor
.updateOne(s"insert [${state.persistenceId}]") { connection =>
val stmt = connection
@@ -235,7 +247,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(3, state.revision)
.bind(4, state.serId)
.bind(5, state.serManifest)
- .bind(6, state.payload)
+ .bind(6, state.payload.getOrElse(Array.emptyByteArray))
bindTags(stmt, 7)
}
.recoverWith { case _: R2dbcDataIntegrityViolationException =>
@@ -248,11 +260,11 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
r2dbcExecutor.updateOne(s"update [${state.persistenceId}]") {
connection =>
val stmt = connection
- .createStatement(updateStateSql)
+ .createStatement(updateStateSql(updateTags = true))
.bind(0, state.revision)
.bind(1, state.serId)
.bind(2, state.serManifest)
- .bind(3, state.payload)
+ .bind(3, state.payload.getOrElse(Array.emptyByteArray))
bindTags(stmt, 4)
if (settings.dbTimestampMonotonicIncreasing) {
@@ -289,18 +301,18 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
}
}
- def deleteState(persistenceId: String): Future[Done] = {
+ private def hardDeleteState(persistenceId: String): Future[Long] = {
val result =
- r2dbcExecutor.updateOne(s"delete [$persistenceId]") { connection =>
+ r2dbcExecutor.updateOne(s"hard delete [$persistenceId]") { connection =>
connection
- .createStatement(deleteStateSql)
+ .createStatement(hardDeleteStateSql)
.bind(0, persistenceId)
}
if (log.isDebugEnabled())
- result.foreach(_ => log.debug("Deleted durable state for persistenceId
[{}]", persistenceId))
+ result.foreach(_ => log.debug("Hard deleted durable state for
persistenceId [{}]", persistenceId))
- result.map(_ => Done)(ExecutionContext.parasitic)
+ result
}
/**
@@ -310,19 +322,66 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
* @since 1.1.0
*/
def deleteStateForRevision(persistenceId: String, revision: Long):
Future[Long] = {
- val result =
- r2dbcExecutor.updateOne(s"delete [$persistenceId, $revision]") {
connection =>
- connection
- .createStatement(deleteStateWithRevisionSql)
- .bind(0, persistenceId)
- .bind(1, revision)
+ if (revision == 0) {
+ hardDeleteState(persistenceId)
+ } else {
+ val result = {
+ if (revision == 1) {
+ val entityType = PersistenceId.extractEntityType(persistenceId)
+ val slice = persistenceExt.sliceForPersistenceId(persistenceId)
+
+ r2dbcExecutor
+ .updateOne(s"insert delete marker [$persistenceId]") { connection
=>
+ connection
+ .createStatement(insertStateSql)
+ .bind(0, slice)
+ .bind(1, entityType)
+ .bind(2, persistenceId)
+ .bind(3, revision)
+ .bind(4, 0)
+ .bind(5, "")
+ .bind(6, Array.emptyByteArray)
+ .bindNull(7, classOf[Array[String]])
+ }
+ .recoverWith { case _: R2dbcDataIntegrityViolationException =>
+ Future.failed(new IllegalStateException(
+ s"Insert delete marker with revision 1 failed: durable state
for persistence id [$persistenceId] already exists"))
+ }
+ } else {
+ val previousRevision = revision - 1
+
+ r2dbcExecutor.updateOne(s"delete [$persistenceId]") { connection =>
+ val stmt = connection
+ .createStatement(updateStateSql(updateTags = false))
+ .bind(0, revision)
+ .bind(1, 0)
+ .bind(2, "")
+ .bind(3, Array.emptyByteArray)
+
+ if (settings.dbTimestampMonotonicIncreasing) {
+ if (settings.durableStateAssertSingleWriter)
+ stmt
+ .bind(4, persistenceId)
+ .bind(5, previousRevision)
+ else
+ stmt
+ .bind(4, persistenceId)
+ } else {
+ stmt
+ .bind(4, persistenceId)
+ .bind(5, previousRevision)
+ .bind(6, persistenceId)
+
+ if (settings.durableStateAssertSingleWriter)
+ stmt.bind(7, previousRevision)
+ else
+ stmt
+ }
+ }
+ }
}
-
- if (log.isDebugEnabled())
- result.foreach(_ =>
- log.debug("Deleted durable state for persistenceId [{}]; revision
[{}]", persistenceId, revision))
-
- result
+ result
+ }
}
override def currentDbTimestamp(): Future[Instant] = {
@@ -366,24 +425,31 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
stmt
},
row =>
- if (backtracking)
+ if (backtracking) {
+ val serId = row.get("state_ser_id", classOf[Integer])
+ // would have been better with an explicit deleted column as in the
journal table,
+ // but not worth the schema change
+ val isDeleted = serId == 0
+
SerializedStateRow(
persistenceId = row.get("persistence_id", classOf[String]),
- revision = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
+ revision = row.get("revision", classOf[Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
- payload = null, // lazy loaded for backtracking
+ // payload = null => lazy loaded for backtracking (ugly, but not
worth changing UpdatedDurableState in Akka)
+ // payload = None => DeletedDurableState (no lazy loading)
+ payload = if (isDeleted) None else null,
serId = 0,
serManifest = "",
tags = Set.empty // tags not fetched in queries (yet)
)
- else
+ } else
SerializedStateRow(
persistenceId = row.get("persistence_id", classOf[String]),
revision = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
- payload = row.get("state_payload", classOf[Array[Byte]]),
+ payload = getPayload(row),
serId = row.get[Integer]("state_ser_id", classOf[Integer]),
serManifest = row.get("state_ser_manifest", classOf[String]),
tags = Set.empty // tags not fetched in queries (yet)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index c1115eb..d53b479 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.r2dbc.state.scaladsl
@@ -24,6 +24,7 @@ import pekko.actor.ExtendedActorSystem
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
@@ -65,8 +66,21 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem,
config: Config, cfg
private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]]
= {
val createEnvelope: (TimestampOffset, SerializedStateRow) =>
DurableStateChange[A] = (offset, row) => {
- val payload = serialization.deserialize(row.payload, row.serId,
row.serManifest).get.asInstanceOf[A]
- new UpdatedDurableState(row.persistenceId, row.revision, payload,
offset, row.dbTimestamp.toEpochMilli)
+ row.payload match {
+ case null =>
+ // payload = null => lazy loaded for backtracking (ugly, but not
worth changing UpdatedDurableState in Akka)
+ new UpdatedDurableState(
+ row.persistenceId,
+ row.revision,
+ null.asInstanceOf[A],
+ offset,
+ row.dbTimestamp.toEpochMilli)
+ case Some(bytes) =>
+ val payload = serialization.deserialize(bytes, row.serId,
row.serManifest).get.asInstanceOf[A]
+ new UpdatedDurableState(row.persistenceId, row.revision, payload,
offset, row.dbTimestamp.toEpochMilli)
+ case None =>
+ new DeletedDurableState(row.persistenceId, row.revision, offset,
row.dbTimestamp.toEpochMilli)
+ }
}
val extractOffset: DurableStateChange[A] => TimestampOffset = env =>
env.offset.asInstanceOf[TimestampOffset]
@@ -79,14 +93,23 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
stateDao.readState(persistenceId).map {
case None => GetObjectResult(None, 0L)
case Some(serializedRow) =>
- val payload = serialization
- .deserialize(serializedRow.payload, serializedRow.serId,
serializedRow.serManifest)
- .get
- .asInstanceOf[A]
- GetObjectResult(Some(payload), serializedRow.revision)
+ val payload =
+ serializedRow.payload.map { bytes =>
+ serialization
+ .deserialize(bytes, serializedRow.serId,
serializedRow.serManifest)
+ .get
+ .asInstanceOf[A]
+ }
+ GetObjectResult(payload, serializedRow.revision)
}
}
+ /**
+ * Insert the value if `revision` is 1, which will fail with
`IllegalStateException` if there is already a stored
+ * value for the given `persistenceId`. Otherwise update the value, which
will fail with `IllegalStateException` if
+ * the existing stored `revision` + 1 isn't equal to the given `revision`.
This optimistic locking check can be
+ * disabled with configuration `assert-single-writer`.
+ */
override def upsertObject(persistenceId: String, revision: Long, value: A,
tag: String): Future[Done] = {
val valueAnyRef = value.asInstanceOf[AnyRef]
val serialized = serialization.serialize(valueAnyRef).get
@@ -98,16 +121,17 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
revision,
DurableStateDao.EmptyDbTimestamp,
DurableStateDao.EmptyDbTimestamp,
- serialized,
+ Some(serialized),
serializer.identifier,
manifest,
if (tag.isEmpty) Set.empty else Set(tag))
- stateDao.writeState(serializedRow)
-
+ stateDao.upsertState(serializedRow)
}
+
override def deleteObject(persistenceId: String): Future[Done] =
- stateDao.deleteState(persistenceId)
+ stateDao.deleteStateForRevision(persistenceId, 0L)
+ .map(_ => Done)(ExecutionContext.parasitic)
override def deleteObject(persistenceId: String, revision: Long):
Future[Done] = {
stateDao.deleteStateForRevision(persistenceId, revision).map { count =>
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
index 95fed2f..bf336a0 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
@@ -117,6 +117,7 @@ object TestActors {
sealed trait Command
final case class Persist(payload: Any) extends Command
final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done])
extends Command
+ final case class DeleteWithAck(replyTo: ActorRef[Done]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command
@@ -144,6 +145,10 @@ object TestActors {
pid.id: Object,
(DurableStateBehavior.lastSequenceNumber(context) + 1:
java.lang.Long): Object)
Effect.persist(command.payload).thenRun(_ => command.replyTo !
Done)
+ case command: DeleteWithAck =>
+ context.log
+ .debug("Delete pid [{}], seqNr [{}]", pid.id,
DurableStateBehavior.lastSequenceNumber(context) + 1)
+ Effect.delete[Any]().thenRun(_ => command.replyTo ! Done)
case Ping(replyTo) =>
replyTo ! Done
Effect.none
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
index 5c30435..bb6d588 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.r2dbc.state
@@ -24,12 +24,14 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.typed.ActorSystem
+import pekko.persistence.query.DeletedDurableState
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.NoOffset
import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.r2dbc.TestActors
+import pekko.persistence.r2dbc.TestActors.DurableStatePersister.DeleteWithAck
import pekko.persistence.r2dbc.TestActors.DurableStatePersister.Persist
import pekko.persistence.r2dbc.TestActors.DurableStatePersister.PersistWithAck
import pekko.persistence.r2dbc.TestConfig
@@ -175,13 +177,38 @@ class DurableStateBySliceSpec
assertFinished(updatedDurableStateProbe2, withOffsetDone)
killSwitch.shutdown()
}
+
+ "emit DeletedDurableState for latest deleted state" in new Setup {
+ val timeout = 10.seconds
+ for (i <- 1 to 3) {
+ persister ! PersistWithAck(s"s-$i", probe.ref)
+ probe.expectMessage(timeout, Done)
+ }
+
+ // Use store directly to ensure delete is committed before query runs.
+ // (Effect.delete() in Pekko DurableStateBehavior is fire-and-forget:
side effects
+ // like thenRun fire before the DB write completes, creating a race
for Current queries.)
+ query.deleteObject(persistenceId, 4L).futureValue
+
+ val deletedDurableStateProbe =
createTestProbe[DeletedDurableState[String]]()
+
+ val done =
+ doQuery(entityType, slice, slice, NoOffset)
+ .collect { case d: DeletedDurableState[String] => d }
+ .via(killSwitch.flow)
+ .runWith(Sink.foreach(deletedDurableStateProbe.ref.tell))
+
+ deletedDurableStateProbe.receiveMessage(timeout).revision shouldBe 4
+ assertFinished(updatedDurableStateProbe, done)
+ killSwitch.shutdown()
+ }
}
}
// tests just relevant for current query
"Current changesBySlices" should {
"filter states with the same timestamp based on seen sequence nrs" in new
Setup {
- persister ! PersistWithAck(s"s-1", probe.ref)
+ persister ! PersistWithAck("s-1", probe.ref)
probe.expectMessage(Done)
val singleState: UpdatedDurableState[String] =
query
@@ -199,7 +226,7 @@ class DurableStateBySliceSpec
}
"not filter states with the same timestamp based on sequence nrs" in new
Setup {
- persister ! PersistWithAck(s"s-1", probe.ref)
+ persister ! PersistWithAck("s-1", probe.ref)
probe.expectMessage(Done)
val singleState: UpdatedDurableState[String] =
query
@@ -246,6 +273,41 @@ class DurableStateBySliceSpec
killSwitch.shutdown()
}
+ "find delete" in new Setup {
+ for (i <- 1 to 19) {
+ persister ! PersistWithAck(s"s-$i", probe.ref)
+ probe.expectMessage(Done)
+ }
+
+ val deletedDurableStateProbe =
createTestProbe[DeletedDurableState[String]]()
+
+ val done =
+ query
+ .changesBySlices(entityType, slice, slice, NoOffset)
+ .via(killSwitch.flow)
+ .runWith(Sink.foreach {
+ case u: UpdatedDurableState[String] =>
updatedDurableStateProbe.ref.tell(u)
+ case u: DeletedDurableState[String] =>
deletedDurableStateProbe.ref.tell(u)
+ })
+ fishForState("s-19", updatedDurableStateProbe).last.revision shouldBe 19
+
+ persister ! DeleteWithAck(probe.ref)
+ probe.expectMessage(Done)
+ deletedDurableStateProbe.receiveMessage().revision shouldBe 20
+
+ for (i <- 21 to 40) {
+ persister ! PersistWithAck(s"s-$i", probe.ref)
+ probe.expectMessage(Done)
+ }
+ fishForState("s-40", updatedDurableStateProbe).last.revision shouldBe 40
+
+ persister ! DeleteWithAck(probe.ref)
+ probe.expectMessage(Done)
+ deletedDurableStateProbe.receiveMessage().revision shouldBe 41
+
+ killSwitch.shutdown()
+ }
+
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
index 7fb9841..98d63a1 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.r2dbc.state
@@ -114,31 +114,79 @@ class DurableStateStoreSpec
store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 0L))
}
- "support deletions with revision" in {
+ "hard delete when revision=0" in {
val entityType = nextEntityType()
val persistenceId = PersistenceId(entityType,
"to-be-added-and-removed").id
val value = "Genuinely Collaborative"
store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
- store.deleteObject(persistenceId, 1L).futureValue
+ store.deleteObject(persistenceId, revision = 0).futureValue
store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 0L))
}
- "fail deleteObject call when revision is unknown" in {
+ "delete payload but keep revision" in {
val entityType = nextEntityType()
val persistenceId = PersistenceId(entityType,
"to-be-added-and-removed").id
+ val value1 = "value1"
+ store.upsertObject(persistenceId, 1L, value1, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value1), 1L))
+ store.deleteObject(persistenceId, revision = 2L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 2L))
+
+ val value2 = "value2"
+ store.upsertObject(persistenceId, 3L, value2, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value2), 3L))
+ }
+
+ "update revision when deleting" in {
+ val entityType = nextEntityType()
+ val persistenceId = PersistenceId(entityType, "to-be-removed").id
+
+ store.deleteObject(persistenceId, revision = 1L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 1L))
+ store.deleteObject(persistenceId, revision = 2L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 2L))
+
+ val value1 = "value1"
+ store.upsertObject(persistenceId, 3L, value1, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value1), 3L))
+
+ store.deleteObject(persistenceId, revision = 4L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 4L))
+ store.deleteObject(persistenceId, revision = 5L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 5L))
+ }
+
+ "detect and reject concurrent delete of revision 1" in {
+ val entityType = nextEntityType()
+ val persistenceId = PersistenceId(entityType,
"id-to-be-deleted-concurrently")
val value = "Genuinely Collaborative"
- store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
- store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
- if (pekko.Version.current.startsWith("1.0")) {
- store.deleteObject(persistenceId, 2L).futureValue
- } else {
- val ex = intercept[Exception] {
- Await.result(store.deleteObject(persistenceId, 2L), 20.seconds)
- }
- ex.getClass.getName shouldEqual
DurableStateExceptionSupport.DeleteRevisionExceptionClass
- }
- store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+ store.upsertObject(persistenceId.id, revision = 1L, value,
entityType).futureValue
+ store.getObject(persistenceId.id).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ val failure =
+ store.deleteObject(persistenceId.id, revision = 1L).failed.futureValue
+ failure.getMessage should include(
+ s"Insert delete marker with revision 1 failed: durable state for
persistence id [${persistenceId.id}] already exists")
+ }
+
+ "detect and reject concurrent deletes" in {
+ val entityType = nextEntityType()
+ val persistenceId = PersistenceId(entityType,
"id-to-be-updated-concurrently")
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId.id, revision = 1L, value,
entityType).futureValue
+ store.getObject(persistenceId.id).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ val updatedValue = "Open to Feedback"
+ store.upsertObject(persistenceId.id, revision = 2L, updatedValue,
entityType).futureValue
+ store.getObject(persistenceId.id).futureValue should
be(GetObjectResult(Some(updatedValue), 2L))
+
+ // simulate a delete by a different node that didn't see the first one:
+ val updatedValue2 = "Genuine and Sincere in all Communications"
+ val failure =
+ store.deleteObject(persistenceId.id, revision = 2L).failed.futureValue
+ failure.getMessage should include(
+ s"Failed to delete object with persistenceId [${persistenceId.id}] and
revision [2]")
}
}
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index e4ba5c9..a560c10 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -135,13 +135,17 @@ Java
Scala
: @@snip
[create](/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala) {
#currentChangesBySlices }
-The emitted `DurableStateChange` can be a `UpdatedDurableState` or
`DeletedDurableState`, but `DeletedDurableState` is not implemented yet.
+The emitted `DurableStateChange` can be a `UpdatedDurableState` or
`DeletedDurableState`.
It will emit an `UpdatedDurableState` when the durable state is updated. When
the state is updated again another
`UpdatedDurableState` is emitted. It will always emit an `UpdatedDurableState`
for the latest revision of the state,
but there is no guarantee that all intermediate changes are emitted if the
state is updated several times. Note that
`UpdatedDurableState` contains the full current state, and it is not a delta
from previous revision of state.
+It will emit an `DeletedDurableState` when the durable state is deleted. When
the state is updated again a new
+`UpdatedDurableState` is emitted. There is no guarantee that all intermediate
changes are emitted if the state is
+updated or deleted several times.
+
`changesBySlices` should be used via @ref:[R2dbcProjection](projection.md),
which will automatically handle the similar difficulties
with duplicates as described for @ref[eventsBySlices](#eventsbyslices). When
using `R2dbcProjection` the changes
will be delivered in revision number order without duplicates.
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index 20ae6bc..c85c50f 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.projection.r2dbc.internal
@@ -30,6 +30,7 @@ import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.annotation.InternalApi
import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
import pekko.persistence.query.DurableStateChange
import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
@@ -1021,8 +1022,16 @@ private[projection] class R2dbcOffsetStore(
timestampOffset,
strictSeqNr = false,
envelopeLoaded = change.value != null))
+ case change: DeletedDurableState[_] if
change.offset.isInstanceOf[TimestampOffset] =>
+ val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
+ Some(
+ RecordWithOffset(
+ Record(change.persistenceId, change.revision,
timestampOffset.timestamp),
+ timestampOffset,
+ strictSeqNr = false,
+ envelopeLoaded = true))
case change: DurableStateChange[_] if
change.offset.isInstanceOf[TimestampOffset] =>
- // FIXME case DeletedDurableState when that is added
+ // in case additional types are added
throw new IllegalArgumentException(
s"DurableStateChange [${change.getClass.getName}] not implemented
yet. Please report bug at
https://github.com/apache/pekko-persistence-r2dbc/issues")
case _ => None
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 4a96fd0..0ee478d 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -8,7 +8,7 @@
*/
/*
- * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ * Copyright (C) 2021-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.projection.r2dbc.internal
@@ -28,6 +28,7 @@ import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.event.Logging
import pekko.event.LoggingAdapter
+import pekko.persistence.query.DeletedDurableState
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
@@ -122,7 +123,6 @@ private[projection] object R2dbcProjectionImpl {
case upd: UpdatedDurableState[_] if upd.value == null =>
val pid = upd.persistenceId
- val revision = upd.revision
(sourceProvider match {
case store: DurableStateStore[_] =>
store.getObject(pid)
@@ -146,10 +146,9 @@ private[projection] object R2dbcProjectionImpl {
count: java.lang.Long)
new UpdatedDurableState(pid, loadedRevision, loadedValue,
upd.offset, upd.timestamp)
.asInstanceOf[Envelope]
- case GetObjectResult(None, _) =>
- // FIXME use DeletedDurableState here when that is added
- throw new IllegalStateException(
- s"Durable state not found when loaded lazily, persistenceId
[$pid], revision [$revision]")
+ case GetObjectResult(None, loadedRevision) =>
+ new DeletedDurableState(pid, loadedRevision, upd.offset,
upd.timestamp)
+ .asInstanceOf[Envelope]
}
case _ =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]