This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 7d971d6 add AdditionalColumn, ChangeHandler, R2dbcSession, custom
table support (#377)
7d971d6 is described below
commit 7d971d6d8deb05d4a93b60bc71f057757165f807
Author: PJ Fanning <[email protected]>
AuthorDate: Tue May 19 11:43:48 2026 +0100
add AdditionalColumn, ChangeHandler, R2dbcSession, custom table support
(#377)
* Add AdditionalColumn, ChangeHandler, R2dbcSession, custom table support
for DurableState
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/ae97f85d-4a63-4ca1-b462-b704e7fba5aa
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Create query-representation.excludes
* Update DurableStateDao.scala
* Update R2dbcDurableStateStore.scala
* Delete DurableStateExceptionSupport.scala
* Fix readState to use entity-type-specific custom table
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/ff2c785b-7373-4279-b8a1-581cea468a76
Co-authored-by: pjfanning <[email protected]>
* try to fix build
* Update DurableStateStoreSpec.scala
* Update DurableStateStoreSpec.scala
* Update DurableStateStoreAdditionalColumnSpec.scala
* Update DurableStateStoreAdditionalColumnSpec.scala
* Update DurableStateStoreAdditionalColumnSpec.scala
* Fix MySQL test failures: skip AdditionalColumn tests with Pending, fix
ChangeHandler SQL params
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/0d52c7a6-7b42-4abc-be24-e801dd646522
Co-authored-by: pjfanning <[email protected]>
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
* class name
* support boolean mapping
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../query-representation.excludes | 23 +
core/src/main/resources/reference.conf | 24 ++
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 40 ++
.../cleanup/scaladsl/DurableStateCleanup.scala | 4 +-
.../r2dbc/internal/AdditionalColumnFactory.scala | 98 +++++
.../r2dbc/internal/ChangeHandlerFactory.scala | 88 ++++
.../r2dbc/session/javadsl/R2dbcSession.scala | 78 ++++
.../r2dbc/session/scaladsl/R2dbcSession.scala | 74 ++++
.../r2dbc/state/ChangeHandlerException.scala | 19 +
.../r2dbc/state/javadsl/AdditionalColumn.scala | 64 +++
.../r2dbc/state/javadsl/ChangeHandler.scala | 44 ++
.../r2dbc/state/scaladsl/AdditionalColumn.scala | 68 +++
.../r2dbc/state/scaladsl/ChangeHandler.scala | 44 ++
.../r2dbc/state/scaladsl/DurableStateDao.scala | 470 ++++++++++++++++-----
.../scaladsl/DurableStateExceptionSupport.scala | 48 ---
.../state/scaladsl/R2dbcDurableStateStore.scala | 65 +--
.../r2dbc/state/JavadslChangeHandler.java | 56 +++
.../persistence/r2dbc/state/JavadslColumn.java | 38 ++
.../state/CurrentPersistenceIdsQuerySpec.scala | 75 +++-
.../DurableStateStoreAdditionalColumnSpec.scala | 235 +++++++++++
.../state/DurableStateStoreChangeHandlerSpec.scala | 212 ++++++++++
.../r2dbc/state/DurableStateStoreSpec.scala | 4 +-
docs/src/main/paradox/projection.md | 2 +-
23 files changed, 1696 insertions(+), 177 deletions(-)
diff --git
a/core/src/main/mima-filters/2.0.x.backwards.excludes/query-representation.excludes
b/core/src/main/mima-filters/2.0.x.backwards.excludes/query-representation.excludes
new file mode 100644
index 0000000..a08d2e3
--- /dev/null
+++
b/core/src/main/mima-filters/2.0.x.backwards.excludes/query-representation.excludes
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/377
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore#PersistenceIdsQueryState.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore#PersistenceIdsQueryState.this")
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore$PersistenceIdsQueryState$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore#PersistenceIdsQueryState.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore#PersistenceIdsQueryState.unapply")
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index ecd8a7f..60b553b 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -95,6 +95,30 @@ pekko.persistence.r2dbc {
# this is disabled.
assert-single-writer = on
+ # Extract a field from the state and store in an additional database
column.
+ # Primary use case is for secondary indexes that can be queried.
+ # Each entity type can have several additional columns.
+ # The AdditionalColumn implementation may optionally define an ActorSystem
+ # constructor parameter.
+ additional-columns {
+ #"<entity-type-name>" = ["<fqcn of AdditionalColumn implementation>"]
+ }
+
+ # Use another table for the given entity types. Typically used together
with
+ # additional-columns but can also be used without additional-columns.
+ custom-table {
+ #"<entity-type-name>" = <other_durable_state_table>
+ }
+
+ # Additional processing in the same transaction as the Durable State upsert
+ # or delete. Primary use case is for storing a query or aggregate
representation
+ # in a separate table.
+ # The ChangeHandler implementation may optionally define an ActorSystem
+ # constructor parameter.
+ change-handler {
+ #"<entity-type-name>" = "<fqcn of ChangeHandler implementation>"
+ }
+
dialect = ${pekko.persistence.r2dbc.dialect}
schema = ${pekko.persistence.r2dbc.schema}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 93402a6..e641792 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -15,7 +15,9 @@ package org.apache.pekko.persistence.r2dbc
import java.util.Locale
+import scala.collection.immutable
import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import org.apache.pekko
@@ -115,6 +117,44 @@ final class StateSettings(val config: Config) extends
ConnectionSettings with Us
val durableStatePayloadCodec: PayloadCodec =
if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else
PayloadCodec.ByteArrayCodec
+
+ private val durableStateTableByEntityType: Map[String, String] = {
+ val cfg = config.getConfig("custom-table")
+ cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString }
+ }
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] val durableStateTableByEntityTypeWithSchema:
Map[String, String] =
+ durableStateTableByEntityType.map { case (entityType, table) =>
+ entityType -> (schema.map(_ + ".").getOrElse("") + table)
+ }
+
+ def getDurableStateTable(entityType: String): String =
+ durableStateTableByEntityType.getOrElse(entityType, durableStateTable)
+
+ def getDurableStateTableWithSchema(entityType: String): String =
+ durableStateTableByEntityTypeWithSchema.getOrElse(entityType,
durableStateTableWithSchema)
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] val durableStateAdditionalColumnClasses:
Map[String, immutable.IndexedSeq[String]] = {
+ val cfg = config.getConfig("additional-columns")
+ cfg.root.unwrapped.asScala.toMap.map {
+ case (k, v: java.util.List[_]) => k ->
v.iterator.asScala.map(_.toString).toVector
+ case (k, v) => k -> Vector(v.toString)
+ }
+ }
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] val durableStateChangeHandlerClasses:
Map[String, String] = {
+ val cfg = config.getConfig("change-handler")
+ cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString }
+ }
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
index 251496b..fc25677 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala
@@ -81,7 +81,7 @@ final class DurableStateCleanup(systemProvider:
ClassicActorSystemProvider, conf
def deleteState(persistenceId: String, resetRevisionNumber: Boolean):
Future[Done] = {
if (resetRevisionNumber) {
stateDao
- .deleteStateForRevision(persistenceId, revision = 0L)
+ .deleteState(persistenceId, revision = 0L)
.map(_ => Done)(ExecutionContext.parasitic)
} else {
stateDao.readState(persistenceId).flatMap {
@@ -89,7 +89,7 @@ final class DurableStateCleanup(systemProvider:
ClassicActorSystemProvider, conf
Future.successful(Done) // already deleted
case Some(s) =>
stateDao
- .deleteStateForRevision(persistenceId, s.revision + 1)
+ .deleteState(persistenceId, s.revision + 1)
.map(_ => Done)(ExecutionContext.parasitic)
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/AdditionalColumnFactory.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/AdditionalColumnFactory.scala
new file mode 100644
index 0000000..fb1bed2
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/AdditionalColumnFactory.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import scala.util.Try
+
+import org.apache.pekko
+import pekko.actor.{ ActorSystem => ClassicActorSystem }
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+import pekko.persistence.r2dbc.state.{ javadsl => javadslState }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object AdditionalColumnFactory {
+
+ /**
+ * Adapter from javadsl.AdditionalColumn to scaladsl.AdditionalColumn
+ */
+ final class AdditionalColumnAdapter(delegate:
javadslState.AdditionalColumn[Any, Any])
+ extends AdditionalColumn[Any, Any] {
+
+ override private[pekko] val fieldClass: Class[_] =
+ delegate.fieldClass
+
+ override def columnName: String =
+ delegate.columnName
+
+ override def bind(upsert: AdditionalColumn.Upsert[Any]):
AdditionalColumn.Binding[Any] = {
+ val javadslUpsert = new javadslState.AdditionalColumn.Upsert[Any](
+ upsert.persistenceId,
+ upsert.entityType,
+ upsert.slice,
+ upsert.revision,
+ upsert.value)
+ delegate.bind(javadslUpsert) match {
+ case bindValue: javadslState.AdditionalColumn.BindValue[_] =>
AdditionalColumn.BindValue(bindValue.value)
+ case javadslState.AdditionalColumn.BindNull =>
AdditionalColumn.BindNull
+ case javadslState.AdditionalColumn.Skip =>
AdditionalColumn.Skip
+ }
+ }
+
+ }
+
+ def create(system: ActorSystem[_], fqcn: String): AdditionalColumn[Any, Any]
= {
+ val dynamicAccess =
system.classicSystem.asInstanceOf[ExtendedActorSystem].dynamicAccess
+
+ def tryCreateScaladslInstance(): Try[AdditionalColumn[Any, Any]] = {
+ dynamicAccess
+ .createInstanceFor[AdditionalColumn[Any, Any]](fqcn, Nil)
+ .orElse(
+ dynamicAccess
+ .createInstanceFor[AdditionalColumn[Any, Any]](fqcn,
List(classOf[ActorSystem[_]] -> system))
+ .orElse(dynamicAccess.createInstanceFor[AdditionalColumn[Any,
Any]](
+ fqcn,
+ List(classOf[ClassicActorSystem] -> system.classicSystem))))
+ }
+
+ def tryCreateJavadslInstance(): Try[javadslState.AdditionalColumn[Any,
Any]] = {
+ dynamicAccess
+ .createInstanceFor[javadslState.AdditionalColumn[Any, Any]](fqcn, Nil)
+ .orElse(
+ dynamicAccess
+ .createInstanceFor[javadslState.AdditionalColumn[Any, Any]](
+ fqcn,
+ List(classOf[ActorSystem[_]] -> system))
+
.orElse(dynamicAccess.createInstanceFor[javadslState.AdditionalColumn[Any,
Any]](
+ fqcn,
+ List(classOf[ClassicActorSystem] -> system.classicSystem))))
+ }
+
+ def adapt(javadslColumn: javadslState.AdditionalColumn[Any, Any]):
AdditionalColumn[Any, Any] =
+ new AdditionalColumnAdapter(javadslColumn)
+
+ tryCreateScaladslInstance()
+ .orElse(tryCreateJavadslInstance().map(adapt))
+ .getOrElse(
+ throw new IllegalArgumentException(
+ s"Additional column [$fqcn] must implement " +
+ s"[${classOf[AdditionalColumn[_, _]].getName}] or
[${classOf[javadslState.AdditionalColumn[_, _]].getName}]. It " +
+ s"may have an ActorSystem constructor parameter."))
+ }
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ChangeHandlerFactory.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ChangeHandlerFactory.scala
new file mode 100644
index 0000000..ca15cee
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/ChangeHandlerFactory.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import scala.jdk.FutureConverters._
+import scala.concurrent.Future
+import scala.util.Try
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.InternalApi
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.persistence.r2dbc.state.{ javadsl => javadslState }
+import pekko.persistence.r2dbc.state.scaladsl.ChangeHandler
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object ChangeHandlerFactory {
+
+ /**
+ * Adapter from javadsl.ChangeHandler to scaladsl.ChangeHandler
+ */
+ final class ChangeHandlerAdapter(delegate: javadslState.ChangeHandler[Any])
extends ChangeHandler[Any] {
+ override def process(session: R2dbcSession, change:
DurableStateChange[Any]): Future[Done] = {
+ val javadslSession =
+ new
pekko.persistence.r2dbc.session.javadsl.R2dbcSession(session.connection)(session.ec,
session.system)
+ delegate.process(javadslSession, change).asScala
+ }
+ }
+
+ def create(system: ActorSystem[_], fqcn: String): ChangeHandler[Any] = {
+ val dynamicAccess =
system.classicSystem.asInstanceOf[ExtendedActorSystem].dynamicAccess
+
+ def tryCreateScaladslInstance(): Try[ChangeHandler[Any]] = {
+ dynamicAccess
+ .createInstanceFor[ChangeHandler[Any]](fqcn, Nil)
+ .orElse(
+ dynamicAccess
+ .createInstanceFor[ChangeHandler[Any]](fqcn,
List(classOf[ActorSystem[_]] -> system))
+ .orElse(
+ dynamicAccess
+ .createInstanceFor[ChangeHandler[Any]](
+ fqcn,
+ List(classOf[pekko.actor.ActorSystem] ->
system.classicSystem))))
+ }
+
+ def tryCreateJavadslInstance(): Try[javadslState.ChangeHandler[Any]] = {
+ dynamicAccess
+ .createInstanceFor[javadslState.ChangeHandler[Any]](fqcn, Nil)
+ .orElse(
+ dynamicAccess
+ .createInstanceFor[javadslState.ChangeHandler[Any]](fqcn,
List(classOf[ActorSystem[_]] -> system))
+ .orElse(
+ dynamicAccess
+ .createInstanceFor[javadslState.ChangeHandler[Any]](
+ fqcn,
+ List(classOf[pekko.actor.ActorSystem] ->
system.classicSystem))))
+ }
+
+ def adapt(changeHandler: javadslState.ChangeHandler[Any]):
ChangeHandler[Any] =
+ new ChangeHandlerAdapter(changeHandler)
+
+ tryCreateScaladslInstance()
+ .orElse(tryCreateJavadslInstance().map(adapt))
+ .getOrElse(
+ throw new IllegalArgumentException(
+ s"Change handler [$fqcn] must implement " +
+ s"[${classOf[ChangeHandler[_]].getName}] or
[${classOf[javadslState.ChangeHandler[_]].getName}]. It " +
+ s"may have an ActorSystem constructor parameter."))
+
+ }
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/session/javadsl/R2dbcSession.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/session/javadsl/R2dbcSession.scala
new file mode 100644
index 0000000..d95f36a
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/session/javadsl/R2dbcSession.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.session.javadsl
+
+import java.util.Optional
+import java.util.concurrent.CompletionStage
+import java.util.function.{ Function => JFunction }
+
+import scala.jdk.CollectionConverters._
+import scala.jdk.FutureConverters._
+import scala.jdk.OptionConverters._
+import scala.concurrent.ExecutionContext
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.session.{ scaladsl => scaladslSession }
+import io.r2dbc.spi.Connection
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
+
+@ApiMayChange
+object R2dbcSession {
+
+ /**
+ * Runs the passed function using a R2dbcSession with a new transaction. The
connection is closed and the transaction
+ * is committed at the end or rolled back in case of failures.
+ */
+ def withSession[A](system: ActorSystem[_], fun: JFunction[R2dbcSession,
CompletionStage[A]]): CompletionStage[A] = {
+ withSession(system, "pekko.persistence.r2dbc.connection-factory", fun)
+ }
+
+ def withSession[A](
+ system: ActorSystem[_],
+ connectionFactoryConfigPath: String,
+ fun: JFunction[R2dbcSession, CompletionStage[A]]): CompletionStage[A] = {
+ scaladslSession.R2dbcSession.withSession(system,
connectionFactoryConfigPath) { scaladslSession =>
+ val javadslSession = new
R2dbcSession(scaladslSession.connection)(system.executionContext, system)
+ fun(javadslSession).asScala
+ }
+ }.asJava
+
+}
+
+@ApiMayChange
+final class R2dbcSession(val connection: Connection)(implicit ec:
ExecutionContext, system: ActorSystem[_]) {
+
+ def createStatement(sql: String): Statement =
+ connection.createStatement(sql)
+
+ def updateOne(statement: Statement): CompletionStage[java.lang.Long] =
+
R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContext.parasitic).asJava
+
+ def update(statements: java.util.List[Statement]):
CompletionStage[java.util.List[java.lang.Long]] =
+ R2dbcExecutor
+ .updateInTx(statements.asScala.toVector)
+ .map(results => results.map(java.lang.Long.valueOf).asJava)
+ .asJava
+
+ def selectOne[A](statement: Statement)(mapRow: Row => A):
CompletionStage[Optional[A]] =
+ R2dbcExecutor.selectOneInTx(statement,
mapRow).map(_.toJava)(ExecutionContext.parasitic).asJava
+
+ def select[A](statement: Statement)(mapRow: Row => A):
CompletionStage[java.util.List[A]] =
+ R2dbcExecutor.selectInTx(statement, mapRow).map(_.asJava).asJava
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/session/scaladsl/R2dbcSession.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/session/scaladsl/R2dbcSession.scala
new file mode 100644
index 0000000..03e639c
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/session/scaladsl/R2dbcSession.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.session.scaladsl
+
+import scala.collection.immutable
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.annotation.ApiMayChange
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import io.r2dbc.spi.Connection
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
+import org.slf4j.LoggerFactory
+
+@ApiMayChange
+object R2dbcSession {
+ private val log = LoggerFactory.getLogger(classOf[R2dbcSession])
+
+ private val logDbCallsDisabled = -1.millis
+
+ /**
+ * Runs the passed function using a R2dbcSession with a new transaction. The
connection is closed and the transaction
+ * is committed at the end or rolled back in case of failures.
+ */
+ def withSession[A](system: ActorSystem[_])(fun: R2dbcSession => Future[A]):
Future[A] = {
+ withSession(system, "pekko.persistence.r2dbc.connection-factory")(fun)
+ }
+
+ def withSession[A](system: ActorSystem[_], connectionFactoryConfigPath:
String)(
+ fun: R2dbcSession => Future[A]): Future[A] = {
+ val connectionFactory =
ConnectionFactoryProvider(system).connectionFactoryFor(connectionFactoryConfigPath)
+ val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
logDbCallsDisabled)(system.executionContext, system)
+ r2dbcExecutor.withConnection("R2dbcSession") { connection =>
+ val session = new R2dbcSession(connection)(system.executionContext,
system)
+ fun(session)
+ }
+ }
+}
+
+@ApiMayChange
+final class R2dbcSession(val connection: Connection)(implicit val ec:
ExecutionContext, val system: ActorSystem[_]) {
+
+ def createStatement(sql: String): Statement =
+ connection.createStatement(sql)
+
+ def updateOne(statement: Statement): Future[Long] =
+ R2dbcExecutor.updateOneInTx(statement)
+
+ def update(statements: immutable.IndexedSeq[Statement]):
Future[immutable.IndexedSeq[Long]] =
+ R2dbcExecutor.updateInTx(statements)
+
+ def selectOne[A](statement: Statement)(mapRow: Row => A): Future[Option[A]] =
+ R2dbcExecutor.selectOneInTx(statement, mapRow)
+
+ def select[A](statement: Statement)(mapRow: Row => A):
Future[immutable.IndexedSeq[A]] =
+ R2dbcExecutor.selectInTx(statement, mapRow)
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/ChangeHandlerException.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/ChangeHandlerException.scala
new file mode 100644
index 0000000..66dbfdc
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/ChangeHandlerException.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state
+
+import org.apache.pekko.annotation.ApiMayChange
+
+@ApiMayChange
+final class ChangeHandlerException(message: String, cause: Throwable) extends
RuntimeException(message, cause)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/AdditionalColumn.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/AdditionalColumn.scala
new file mode 100644
index 0000000..7c2a182
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/AdditionalColumn.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state.javadsl
+
+import org.apache.pekko
+import pekko.annotation.ApiMayChange
+import pekko.annotation.InternalApi
+
+@ApiMayChange
+object AdditionalColumn {
+ final case class Upsert[A](persistenceId: String, entityType: String, slice:
Int, revision: Long, value: A)
+
+ sealed trait Binding[+B]
+
+ def bindValue[B](value: B): Binding[B] = new BindValue(value)
+
+ def bindNull[B]: Binding[B] = BindNull
+
+ def skip[B]: Binding[B] = Skip
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] final class BindValue[B](val value: B) extends
Binding[B]
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] case object BindNull extends Binding[Nothing]
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] case object Skip extends Binding[Nothing]
+
+}
+
+/**
+ * @tparam A
+ * The type of the durable state
+ * @tparam B
+ * The type of the field stored in the additional column.
+ */
+@ApiMayChange
+abstract class AdditionalColumn[A, B] {
+
+ def fieldClass: Class[B]
+
+ def columnName: String
+
+ def bind(upsert: AdditionalColumn.Upsert[A]): AdditionalColumn.Binding[B]
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/ChangeHandler.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/ChangeHandler.scala
new file mode 100644
index 0000000..c2e9eb9
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/javadsl/ChangeHandler.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state.javadsl
+
+import java.util.concurrent.CompletionStage
+
+import org.apache.pekko
+import pekko.Done
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.r2dbc.session.javadsl.R2dbcSession
+
+@ApiMayChange
+trait ChangeHandler[A] {
+
+ /**
+ * Implement this method to perform additional processing in the same
transaction as the Durable State upsert or
+ * delete.
+ *
+ * The `process` method is invoked for each `DurableStateChange`. Each time
a new `Connection` is passed with a new
+ * open transaction. You can use `createStatement`, `update` and other
methods provided by the [[R2dbcSession]]. The
+ * results of several statements can be combined with `CompletionStage`
composition (e.g. `thenCompose`). The
+ * transaction will be automatically committed or rolled back when the
returned `CompletionStage` is completed. Note
+ * that an exception here will abort the transaction and fail the upsert or
delete.
+ *
+ * The `ChangeHandler` should be implemented as a stateless function without
mutable state because the same
+ * `ChangeHandler` instance may be invoked concurrently for different
entities. For a specific entity (persistenceId)
+ * one change is processed at a time and this `process` method will not be
invoked with the next change for that
+ * entity until after the returned `CompletionStage` is completed.
+ */
+ def process(session: R2dbcSession, change: DurableStateChange[A]):
CompletionStage[Done]
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/AdditionalColumn.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/AdditionalColumn.scala
new file mode 100644
index 0000000..09ee2f7
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/AdditionalColumn.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state.scaladsl
+
+import scala.reflect.ClassTag
+
+import org.apache.pekko
+import pekko.annotation.ApiMayChange
+import pekko.annotation.InternalApi
+
+@ApiMayChange
+object AdditionalColumn {
+ final case class Upsert[A](persistenceId: String, entityType: String, slice:
Int, revision: Long, value: A)
+
+ sealed trait Binding[+B]
+
+ final case class BindValue[B](value: B) extends Binding[B]
+
+ case object BindNull extends Binding[Nothing]
+
+ case object Skip extends Binding[Nothing]
+
+ private val scalaPrimitivesMapping: Map[Class[_], Class[_]] =
+ Map(
+ classOf[Int] -> classOf[java.lang.Integer],
+ classOf[Long] -> classOf[java.lang.Long],
+ classOf[Float] -> classOf[java.lang.Float],
+ classOf[Double] -> classOf[java.lang.Double],
+ classOf[Byte] -> classOf[java.lang.Byte],
+ classOf[Short] -> classOf[java.lang.Short],
+ classOf[Char] -> classOf[java.lang.Character],
+ classOf[Boolean] -> classOf[java.lang.Boolean])
+}
+
+/**
+ * @tparam A
+ * The type of the durable state
+ * @tparam B
+ * The type of the field stored in the additional column.
+ */
+@ApiMayChange
+abstract class AdditionalColumn[A, B: ClassTag] {
+ import AdditionalColumn.scalaPrimitivesMapping
+
+ /**
+ * INTERNAL API: used when binding null
+ */
+ @InternalApi private[pekko] val fieldClass: Class[_] = {
+ val cls = implicitly[ClassTag[B]].runtimeClass
+ scalaPrimitivesMapping.getOrElse(cls, cls)
+ }
+
+ def columnName: String
+
+ def bind(upsert: AdditionalColumn.Upsert[A]): AdditionalColumn.Binding[B]
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/ChangeHandler.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/ChangeHandler.scala
new file mode 100644
index 0000000..0744e74
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/ChangeHandler.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state.scaladsl
+
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.Done
+import pekko.annotation.ApiMayChange
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+
+@ApiMayChange
+trait ChangeHandler[A] {
+
+ /**
+ * Implement this method to perform additional processing in the same
transaction as the Durable State upsert or
+ * delete.
+ *
+ * The `process` method is invoked for each `DurableStateChange`. Each time
a new `Connection` is passed with a new
+ * open transaction. You can use `createStatement`, `update` and other
methods provided by the [[R2dbcSession]]. The
+ * results of several statements can be combined with `Future` composition.
The transaction will be automatically
+ * committed or rolled back when the returned `Future` is completed. Note
that an exception here will abort the
+ * transaction and fail the upsert or delete.
+ *
+ * The `ChangeHandler` should be implemented as a stateless function without
mutable state because the same
+ * `ChangeHandler` instance may be invoked concurrently for different
entities. For a specific entity (persistenceId)
+ * one change is processed at a time and this `process` method will not be
invoked with the next change for that
+ * entity until after the returned `Future` is completed.
+ */
+ def process(session: R2dbcSession, change: DurableStateChange[A]):
Future[Done]
+
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 8b45992..ebc920a 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -13,11 +13,14 @@
package org.apache.pekko.persistence.r2dbc.state.scaladsl
+import java.lang
import java.time.Instant
import java.util
+import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ Duration, FiniteDuration }
+import scala.util.control.NonFatal
import org.apache.pekko
import pekko.Done
@@ -25,25 +28,36 @@ import pekko.NotUsed
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.query.NoOffset
+import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.StateSettings
+import pekko.persistence.r2dbc.internal.AdditionalColumnFactory
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.ChangeHandlerFactory
import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.internal.PayloadCodec
import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.persistence.r2dbc.state.ChangeHandlerException
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+import pekko.persistence.r2dbc.state.scaladsl.ChangeHandler
import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
import com.typesafe.config.Config
+import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
import io.r2dbc.spi.R2dbcDataIntegrityViolationException
+import io.r2dbc.spi.Row
import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -68,6 +82,12 @@ import org.slf4j.LoggerFactory
override def seqNr: Long = revision
}
+ private final case class EvaluatedAdditionalColumnBindings(
+ additionalColumn: AdditionalColumn[_, _],
+ binding: AdditionalColumn.Binding[_])
+
+ private val FutureDone: Future[Done] = Future.successful(Done)
+
def fromConfig(
settings: StateSettings,
config: Config
@@ -105,9 +125,26 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
protected val stateTable = settings.durableStateTableWithSchema
protected implicit val statePayloadCodec: PayloadCodec =
settings.durableStatePayloadCodec
- private val selectStateSql: String = sql"""
+ private lazy val additionalColumns: Map[String,
immutable.IndexedSeq[AdditionalColumn[Any, Any]]] = {
+ settings.durableStateAdditionalColumnClasses.map { case (entityType,
columnClasses) =>
+ val instances = columnClasses.map(fqcn =>
AdditionalColumnFactory.create(system, fqcn))
+ entityType -> instances
+ }
+ }
+
+ private lazy val changeHandlers: Map[String, ChangeHandler[Any]] = {
+ settings.durableStateChangeHandlerClasses.map { case (entityType, fqcn) =>
+ val handler = ChangeHandlerFactory.create(system, fqcn)
+ entityType -> handler
+ }
+ }
+
+ private def selectStateSql(entityType: String): String = {
+ val table = settings.getDurableStateTableWithSchema(entityType)
+ sql"""
SELECT revision, state_ser_id, state_ser_manifest, state_payload,
db_timestamp
- FROM $stateTable WHERE persistence_id = ?"""
+ FROM $table WHERE persistence_id = ?"""
+ }
protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
sql"""
@@ -128,52 +165,111 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
}
}
- private val insertStateSql: String = sql"""
- INSERT INTO $stateTable
- (slice, entity_type, persistence_id, revision, state_ser_id,
state_ser_manifest, state_payload, tags, db_timestamp)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, $transactionTimestampSql)"""
+ private def insertStateSql(
+ entityType: String,
+ additionalBindings:
immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
+ val table = settings.getDurableStateTableWithSchema(entityType)
+ val additionalCols = additionalInsertColumns(additionalBindings)
+ val additionalParams = additionalInsertParameters(additionalBindings)
+ sql"""
+ INSERT INTO $table
+ (slice, entity_type, persistence_id, revision, state_ser_id,
state_ser_manifest, state_payload, tags$additionalCols, db_timestamp)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?$additionalParams,
$transactionTimestampSql)"""
+ }
+
+ private def additionalInsertColumns(
+ additionalBindings:
immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
+ if (additionalBindings.isEmpty) ""
+ else {
+ val strB = new lang.StringBuilder()
+ additionalBindings.foreach {
+ case EvaluatedAdditionalColumnBindings(c, _:
AdditionalColumn.BindValue[_]) =>
+ strB.append(", ").append(c.columnName)
+ case EvaluatedAdditionalColumnBindings(c, AdditionalColumn.BindNull) =>
+ strB.append(", ").append(c.columnName)
+ case EvaluatedAdditionalColumnBindings(_, AdditionalColumn.Skip) =>
+ }
+ strB.toString
+ }
+ }
+
+ private def additionalInsertParameters(
+ additionalBindings:
immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
+ if (additionalBindings.isEmpty) ""
+ else {
+ val strB = new lang.StringBuilder()
+ additionalBindings.foreach {
+ case EvaluatedAdditionalColumnBindings(_, _:
AdditionalColumn.BindValue[_]) |
+ EvaluatedAdditionalColumnBindings(_, AdditionalColumn.BindNull) =>
+ strB.append(", ?")
+ case EvaluatedAdditionalColumnBindings(_, AdditionalColumn.Skip) =>
+ }
+ strB.toString
+ }
+ }
+
+ private def updateStateSql(
+ entityType: String,
+ updateTags: Boolean,
+ additionalBindings:
immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
+ val table = settings.getDurableStateTableWithSchema(entityType)
- private def updateStateSql(updateTags: Boolean): String = {
val timestamp =
if (settings.dbTimestampMonotonicIncreasing)
s"$transactionTimestampSql"
else
s"GREATEST($transactionTimestampSql, " +
- s"(SELECT db_timestamp + '1 microsecond'::interval FROM $stateTable
WHERE persistence_id = ? AND revision = ?))"
+ s"(SELECT db_timestamp + '1 microsecond'::interval FROM $table WHERE
persistence_id = ? AND revision = ?))"
val revisionCondition =
if (settings.durableStateAssertSingleWriter) " AND revision = ?"
else ""
- val tags = if (updateTags) "tags = ?," else ""
+ val tags = if (updateTags) ", tags = ?" else ""
+ val additionalParams = additionalUpdateParameters(additionalBindings)
sql"""
- UPDATE $stateTable
- SET revision = ?, state_ser_id = ?, state_ser_manifest = ?,
state_payload = ?, $tags db_timestamp = $timestamp
+ UPDATE $table
+ SET revision = ?, state_ser_id = ?, state_ser_manifest = ?,
state_payload = ?$tags$additionalParams, db_timestamp = $timestamp
WHERE persistence_id = ?
$revisionCondition"""
}
- private val hardDeleteStateSql: String =
- sql"DELETE from $stateTable WHERE persistence_id = ?"
+ private def additionalUpdateParameters(
+ additionalBindings:
immutable.IndexedSeq[EvaluatedAdditionalColumnBindings]): String = {
+ if (additionalBindings.isEmpty) ""
+ else {
+ val strB = new lang.StringBuilder()
+ additionalBindings.foreach {
+ case EvaluatedAdditionalColumnBindings(col, _:
AdditionalColumn.BindValue[_]) =>
+ strB.append(", ").append(col.columnName).append(" = ?")
+ case EvaluatedAdditionalColumnBindings(col, AdditionalColumn.BindNull)
=>
+ strB.append(", ").append(col.columnName).append(" = ?")
+ case EvaluatedAdditionalColumnBindings(_, AdditionalColumn.Skip) =>
+ }
+ strB.toString
+ }
+ }
- private val deleteStateWithRevisionSql: String =
- sql"DELETE from $stateTable WHERE persistence_id = ? AND revision = ?"
+ private def hardDeleteStateSql(entityType: String): String = {
+ val table = settings.getDurableStateTableWithSchema(entityType)
+ sql"DELETE from $table WHERE persistence_id = ?"
+ }
private val currentDbTimestampSql =
sql"SELECT transaction_timestamp() AS db_timestamp"
- private val allPersistenceIdsSql =
- sql"SELECT persistence_id from $stateTable ORDER BY persistence_id LIMIT ?"
+ private def allPersistenceIdsSql(table: String): String =
+ sql"SELECT persistence_id from $table ORDER BY persistence_id LIMIT ?"
- private val persistenceIdsForEntityTypeSql =
- sql"SELECT persistence_id from $stateTable WHERE persistence_id LIKE ?
ORDER BY persistence_id LIMIT ?"
+ private def persistenceIdsForEntityTypeSql(table: String): String =
+ sql"SELECT persistence_id from $table WHERE persistence_id LIKE ? ORDER BY
persistence_id LIMIT ?"
- private val allPersistenceIdsAfterSql =
- sql"SELECT persistence_id from $stateTable WHERE persistence_id > ? ORDER
BY persistence_id LIMIT ?"
+ private def allPersistenceIdsAfterSql(table: String): String =
+ sql"SELECT persistence_id from $table WHERE persistence_id > ? ORDER BY
persistence_id LIMIT ?"
- private val persistenceIdsForEntityTypeAfterSql =
- sql"SELECT persistence_id from $stateTable WHERE persistence_id LIKE ? AND
persistence_id > ? ORDER BY persistence_id LIMIT ?"
+ private def persistenceIdsForEntityTypeAfterSql(table: String): String =
+ sql"SELECT persistence_id from $table WHERE persistence_id LIKE ? AND
persistence_id > ? ORDER BY persistence_id LIMIT ?"
protected def stateBySlicesRangeSql(
maxDbTimestampParam: Boolean,
@@ -207,10 +303,11 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
}
def readState(persistenceId: String): Future[Option[SerializedStateRow]] = {
+ val entityType = PersistenceId.extractEntityType(persistenceId)
r2dbcExecutor.selectOne(s"select [$persistenceId]")(
connection =>
connection
- .createStatement(selectStateSql)
+ .createStatement(selectStateSql(entityType))
.bind(0, persistenceId),
row =>
SerializedStateRow(
@@ -234,7 +331,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
Option(rowPayload)
}
- def upsertState(state: SerializedStateRow): Future[Done] = {
+ def upsertState(state: SerializedStateRow, value: Any): Future[Done] = {
require(state.revision > 0)
def bindTags(stmt: Statement, i: Int): Statement = {
@@ -244,61 +341,120 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
stmt.bind(i, state.tags.toArray)
}
+ var bindIdx = 0
+ def getAndIncIndex(): Int = {
+ val i = bindIdx
+ bindIdx += 1
+ i
+ }
+
+ def bindAdditionalColumns(
+ stmt: Statement,
+ additionalBindings: IndexedSeq[EvaluatedAdditionalColumnBindings]):
Statement = {
+ additionalBindings.foreach {
+ case EvaluatedAdditionalColumnBindings(_,
AdditionalColumn.BindValue(v)) =>
+ stmt.bind(getAndIncIndex(), v)
+ case EvaluatedAdditionalColumnBindings(col, AdditionalColumn.BindNull)
=>
+ stmt.bindNull(getAndIncIndex(), col.fieldClass)
+ case EvaluatedAdditionalColumnBindings(_, AdditionalColumn.Skip) =>
+ }
+ stmt
+ }
+
+ def change =
+ new UpdatedDurableState[Any](state.persistenceId, state.revision, value,
NoOffset, EmptyDbTimestamp.toEpochMilli)
+
+ val entityType = PersistenceId.extractEntityType(state.persistenceId)
+
val result = {
+ val additionalBindings = additionalColumns.get(entityType) match {
+ case None => Vector.empty[EvaluatedAdditionalColumnBindings]
+ case Some(columns) =>
+ val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
+ val upsert = AdditionalColumn.Upsert(state.persistenceId,
entityType, slice, state.revision, value)
+ columns.map(c => EvaluatedAdditionalColumnBindings(c,
c.bind(upsert)))
+ }
+
if (state.revision == 1) {
- val entityType = PersistenceId.extractEntityType(state.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(state.persistenceId)
- r2dbcExecutor
- .updateOne(s"insert [${state.persistenceId}]") { connection =>
- val stmt = connection
- .createStatement(insertStateSql)
- .bind(0, slice)
- .bind(1, entityType)
- .bind(2, state.persistenceId)
- .bind(3, state.revision)
- .bind(4, state.serId)
- .bind(5, state.serManifest)
- .bindPayloadOption(6, state.payload)
- bindTags(stmt, 7)
- }
- .recoverWith { case _: R2dbcDataIntegrityViolationException =>
+ def insertStatement(connection: Connection): Statement = {
+ val stmt = connection
+ .createStatement(insertStateSql(entityType, additionalBindings))
+ .bind(getAndIncIndex(), slice)
+ .bind(getAndIncIndex(), entityType)
+ .bind(getAndIncIndex(), state.persistenceId)
+ .bind(getAndIncIndex(), state.revision)
+ .bind(getAndIncIndex(), state.serId)
+ .bind(getAndIncIndex(), state.serManifest)
+ .bindPayloadOption(getAndIncIndex(), state.payload)
+ bindTags(stmt, getAndIncIndex())
+ bindAdditionalColumns(stmt, additionalBindings)
+ }
+
+ def recoverDataIntegrityViolation[A](f: Future[A]): Future[A] =
+ f.recoverWith { case _: R2dbcDataIntegrityViolationException =>
Future.failed(
new IllegalStateException(
s"Insert failed: durable state for persistence id
[${state.persistenceId}] already exists"))
}
+
+ changeHandlers.get(entityType) match {
+ case None =>
+ recoverDataIntegrityViolation(r2dbcExecutor.updateOne(s"insert
[${state.persistenceId}]")(insertStatement))
+ case Some(handler) =>
+ r2dbcExecutor.withConnection(s"insert [${state.persistenceId}]
with change handler") { connection =>
+ for {
+ updatedRows <-
recoverDataIntegrityViolation(R2dbcExecutor.updateOneInTx(insertStatement(connection)))
+ _ <- processChange(handler, connection, change)
+ } yield updatedRows
+ }
+ }
} else {
val previousRevision = state.revision - 1
- r2dbcExecutor.updateOne(s"update [${state.persistenceId}]") {
connection =>
+ def updateStatement(connection: Connection): Statement = {
val stmt = connection
- .createStatement(updateStateSql(updateTags = true))
- .bind(0, state.revision)
- .bind(1, state.serId)
- .bind(2, state.serManifest)
- .bindPayloadOption(3, state.payload)
- bindTags(stmt, 4)
+ .createStatement(updateStateSql(entityType, updateTags = true,
additionalBindings))
+ .bind(getAndIncIndex(), state.revision)
+ .bind(getAndIncIndex(), state.serId)
+ .bind(getAndIncIndex(), state.serManifest)
+ .bindPayloadOption(getAndIncIndex(), state.payload)
+ bindTags(stmt, getAndIncIndex())
+ bindAdditionalColumns(stmt, additionalBindings)
if (settings.dbTimestampMonotonicIncreasing) {
if (settings.durableStateAssertSingleWriter)
stmt
- .bind(5, state.persistenceId)
- .bind(6, previousRevision)
+ .bind(getAndIncIndex(), state.persistenceId)
+ .bind(getAndIncIndex(), previousRevision)
else
stmt
- .bind(5, state.persistenceId)
+ .bind(getAndIncIndex(), state.persistenceId)
} else {
stmt
- .bind(5, state.persistenceId)
- .bind(6, previousRevision)
- .bind(7, state.persistenceId)
+ .bind(getAndIncIndex(), state.persistenceId)
+ .bind(getAndIncIndex(), previousRevision)
+ .bind(getAndIncIndex(), state.persistenceId)
if (settings.durableStateAssertSingleWriter)
- stmt.bind(8, previousRevision)
+ stmt.bind(getAndIncIndex(), previousRevision)
else
stmt
}
}
+
+ changeHandlers.get(entityType) match {
+ case None =>
+ r2dbcExecutor.updateOne(s"update
[${state.persistenceId}]")(updateStatement)
+ case Some(handler) =>
+ r2dbcExecutor.withConnection(s"update [${state.persistenceId}]
with change handler") { connection =>
+ for {
+ updatedRows <-
R2dbcExecutor.updateOneInTx(updateStatement(connection))
+ _ <- processChange(handler, connection, change)
+ } yield updatedRows
+ }
+ }
}
}
@@ -313,58 +469,82 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
}
}
- private def hardDeleteState(persistenceId: String): Future[Long] = {
- val result =
- r2dbcExecutor.updateOne(s"hard delete [$persistenceId]") { connection =>
- connection
- .createStatement(hardDeleteStateSql)
- .bind(0, persistenceId)
- }
+ private def processChange(
+ handler: ChangeHandler[Any],
+ connection: Connection,
+ change: DurableStateChange[Any]): Future[Done] = {
+ val session = new R2dbcSession(connection)
- if (log.isDebugEnabled())
- result.foreach(_ => log.debug("Hard deleted durable state for
persistenceId [{}]", persistenceId))
+ def excMessage(cause: Throwable): String = {
+ val (changeType, revision) = change match {
+ case upd: UpdatedDurableState[_] => "update" -> upd.revision
+ case del: DeletedDurableState[_] => "delete" -> del.revision
+ }
+ s"Change handler $changeType failed for [${change.persistenceId}]
revision [$revision], due to ${cause.getMessage}"
+ }
- result
+ try handler.process(session, change).recoverWith { case NonFatal(cause) =>
+ Future.failed[Done](new ChangeHandlerException(excMessage(cause),
cause))
+ }
+ catch {
+ case NonFatal(cause) => throw new
ChangeHandlerException(excMessage(cause), cause)
+ }
}
- /**
- * @param persistenceId The persistence id for the object
- * @param revision The revision to delete
- * @return The number of rows deleted
- * @since 1.1.0
- */
- def deleteStateForRevision(persistenceId: String, revision: Long):
Future[Long] = {
+ def deleteState(persistenceId: String, revision: Long): Future[Done] = {
if (revision == 0) {
- hardDeleteState(persistenceId)
+ hardDeleteState(persistenceId).map(_ => Done)(ExecutionContext.parasitic)
} else {
val result = {
+ val entityType = PersistenceId.extractEntityType(persistenceId)
+ def change =
+ new DeletedDurableState[Any](persistenceId, revision, NoOffset,
EmptyDbTimestamp.toEpochMilli)
if (revision == 1) {
- val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
- r2dbcExecutor
- .updateOne(s"insert delete marker [$persistenceId]") { connection
=>
- connection
- .createStatement(insertStateSql)
- .bind(0, slice)
- .bind(1, entityType)
- .bind(2, persistenceId)
- .bind(3, revision)
- .bind(4, 0)
- .bind(5, "")
- .bindPayloadOption(6, None)
- .bindNull(7, classOf[Array[String]])
- }
- .recoverWith { case _: R2dbcDataIntegrityViolationException =>
+ def insertDeleteMarkerStatement(connection: Connection): Statement =
{
+ connection
+ .createStatement(
+ insertStateSql(entityType, Vector.empty)
+ ) // FIXME should the additional columns be cleared (null)? Then
they must allow NULL
+ .bind(0, slice)
+ .bind(1, entityType)
+ .bind(2, persistenceId)
+ .bind(3, revision)
+ .bind(4, 0)
+ .bind(5, "")
+ .bindPayloadOption(6, None)
+ .bindNull(7, classOf[Array[String]])
+ }
+
+ def recoverDataIntegrityViolation[A](f: Future[A]): Future[A] =
+ f.recoverWith { case _: R2dbcDataIntegrityViolationException =>
Future.failed(new IllegalStateException(
s"Insert delete marker with revision 1 failed: durable state
for persistence id [$persistenceId] already exists"))
}
+
+ val changeHandler = changeHandlers.get(entityType)
+ val changeHandlerHint = changeHandler.map(_ => " with change
handler").getOrElse("")
+
+ r2dbcExecutor.withConnection(s"insert delete marker
[$persistenceId]$changeHandlerHint") { connection =>
+ for {
+ updatedRows <- recoverDataIntegrityViolation(
+
R2dbcExecutor.updateOneInTx(insertDeleteMarkerStatement(connection)))
+ _ <- changeHandler match {
+ case None => FutureDone
+ case Some(handler) => processChange(handler, connection,
change)
+ }
+ } yield updatedRows
+ }
+
} else {
val previousRevision = revision - 1
- r2dbcExecutor.updateOne(s"delete [$persistenceId]") { connection =>
+ def updateStatement(connection: Connection): Statement = {
val stmt = connection
- .createStatement(updateStateSql(updateTags = false))
+ .createStatement(
+ updateStateSql(entityType, updateTags = false, Vector.empty)
+ ) // FIXME should the additional columns be cleared (null)? Then
they must allow NULL
.bind(0, revision)
.bind(1, 0)
.bind(2, "")
@@ -390,12 +570,63 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
stmt
}
}
+
+ val changeHandler = changeHandlers.get(entityType)
+ val changeHandlerHint = changeHandler.map(_ => " with change
handler").getOrElse("")
+
+ r2dbcExecutor.withConnection(s"delete
[$persistenceId]$changeHandlerHint") { connection =>
+ for {
+ updatedRows <-
R2dbcExecutor.updateOneInTx(updateStatement(connection))
+ _ <- changeHandler match {
+ case None => FutureDone
+ case Some(handler) => processChange(handler, connection,
change)
+ }
+ } yield updatedRows
+ }
+ }
+ }
+
+ result.map { updatedRows =>
+ if (updatedRows != 1)
+ throw new IllegalStateException(
+ s"Delete failed: durable state for persistence id [$persistenceId]
could not be updated to revision [$revision]")
+ else {
+ log.debug("Deleted durable state for persistenceId [{}] to revision
[{}]", persistenceId, revision)
+ Done
}
}
- result
+
}
}
+ private def hardDeleteState(persistenceId: String): Future[Long] = {
+ val entityType = PersistenceId.extractEntityType(persistenceId)
+
+ val changeHandler = changeHandlers.get(entityType)
+ val changeHandlerHint = changeHandler.map(_ => " with change
handler").getOrElse("")
+
+ val result =
+ r2dbcExecutor.withConnection(s"hard delete
[$persistenceId]$changeHandlerHint") { connection =>
+ for {
+ updatedRows <- R2dbcExecutor.updateOneInTx(
+ connection
+ .createStatement(hardDeleteStateSql(entityType))
+ .bind(0, persistenceId))
+ _ <- changeHandler match {
+ case None => FutureDone
+ case Some(handler) =>
+ val change = new DeletedDurableState[Any](persistenceId, 0L,
NoOffset, EmptyDbTimestamp.toEpochMilli)
+ processChange(handler, connection, change)
+ }
+ } yield updatedRows
+ }
+
+ if (log.isDebugEnabled())
+ result.foreach(_ => log.debug("Hard deleted durable state for
persistenceId [{}]", persistenceId))
+
+ result
+ }
+
override def currentDbTimestamp(): Future[Instant] = {
r2dbcExecutor
.selectOne("select current db timestamp")(
@@ -477,19 +708,20 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
}
def persistenceIds(entityType: String, afterId: Option[String], limit:
Long): Source[String, NotUsed] = {
+ val table = settings.getDurableStateTableWithSchema(entityType)
val likeStmtPostfix = PersistenceId.DefaultSeparator + "%"
val result = r2dbcExecutor.select(s"select persistenceIds by entity type")(
connection =>
afterId match {
case Some(after) =>
connection
- .createStatement(persistenceIdsForEntityTypeAfterSql)
+ .createStatement(persistenceIdsForEntityTypeAfterSql(table))
.bind(0, entityType + likeStmtPostfix)
.bind(1, after)
.bind(2, limit)
case None =>
connection
- .createStatement(persistenceIdsForEntityTypeSql)
+ .createStatement(persistenceIdsForEntityTypeSql(table))
.bind(0, entityType + likeStmtPostfix)
.bind(1, limit)
},
@@ -502,25 +734,69 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
}
def persistenceIds(afterId: Option[String], limit: Long): Source[String,
NotUsed] = {
+ if (settings.durableStateTableByEntityTypeWithSchema.isEmpty)
+ persistenceIdsFromTable(afterId, limit, stateTable)
+ else {
+ def readFromCustomTables(
+ acc: immutable.IndexedSeq[String],
+ remainingTables: Vector[String]):
Future[immutable.IndexedSeq[String]] = {
+ if (acc.size >= limit) {
+ Future.successful(acc)
+ } else if (remainingTables.isEmpty) {
+ Future.successful(acc)
+ } else {
+ readPersistenceIds(afterId, limit, remainingTables.head).flatMap {
ids =>
+ readFromCustomTables(acc ++ ids, remainingTables.tail)
+ }
+ }
+ }
+
+ val customTables =
settings.durableStateTableByEntityTypeWithSchema.toVector.sortBy(_._1).map(_._2)
+ val ids = for {
+ fromDefaultTable <- readPersistenceIds(afterId, limit, stateTable)
+ fromCustomTables <- readFromCustomTables(Vector.empty, customTables)
+ } yield {
+ (fromDefaultTable ++ fromCustomTables).sorted
+ }
+
+
Source.futureSource(ids.map(Source(_))).take(limit).mapMaterializedValue(_ =>
NotUsed)
+ }
+ }
+
+ /**
+ * INTERNAL API
+ */
+ @InternalApi private[pekko] def persistenceIdsFromTable(
+ afterId: Option[String],
+ limit: Long,
+ table: String): Source[String, NotUsed] = {
+ val result = readPersistenceIds(afterId, limit, table)
+
+ Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
+ }
+
+ private def readPersistenceIds(
+ afterId: Option[String],
+ limit: Long,
+ table: String): Future[immutable.IndexedSeq[String]] = {
val result = r2dbcExecutor.select(s"select persistenceIds")(
connection =>
afterId match {
case Some(after) =>
connection
- .createStatement(allPersistenceIdsAfterSql)
+ .createStatement(allPersistenceIdsAfterSql(table))
.bind(0, after)
.bind(1, limit)
case None =>
connection
- .createStatement(allPersistenceIdsSql)
+ .createStatement(allPersistenceIdsSql(table))
.bind(0, limit)
},
row => row.get("persistence_id", classOf[String]))
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] persistence ids", rows.size))
-
- Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
+ result
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateExceptionSupport.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateExceptionSupport.scala
deleted file mode 100644
index fb4f5e9..0000000
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateExceptionSupport.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pekko.persistence.r2dbc.state.scaladsl
-
-import java.lang.invoke.{ MethodHandles, MethodType }
-
-import scala.util.Try
-
-/**
- * INTERNAL API
- *
- * Support for creating a `DeleteRevisionException`if the class is
- * available on the classpath. Pekko 1.0 does not have this class, but
- * it is added in Pekko 1.1.
- */
-private[state] object DurableStateExceptionSupport {
- val DeleteRevisionExceptionClass =
- "org.apache.pekko.persistence.state.exception.DeleteRevisionException"
-
- private def exceptionClassOpt: Option[Class[_]] =
- Try(Class.forName(DeleteRevisionExceptionClass)).toOption
-
- private val constructorOpt = exceptionClassOpt.map { clz =>
- val mt = MethodType.methodType(classOf[Unit], classOf[String])
- MethodHandles.publicLookup().findConstructor(clz, mt)
- }
-
- def createDeleteRevisionExceptionIfSupported(message: String):
Option[Exception] =
- constructorOpt.map { constructor =>
- constructor.invoke(message).asInstanceOf[Exception]
- }
-
-}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index c860b2a..46924c7 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -45,7 +45,11 @@ import org.slf4j.LoggerFactory
object R2dbcDurableStateStore {
val Identifier = "pekko.persistence.r2dbc.state"
- private final case class PersistenceIdsQueryState(queryCount: Int, rowCount:
Int, latestPid: String)
+ private final case class PersistenceIdsQueryState(
+ queryCount: Int,
+ rowCount: Int,
+ latestPid: String,
+ tables: List[String])
}
class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config,
cfgPath: String)
@@ -126,27 +130,24 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
manifest,
if (tag.isEmpty) Set.empty else Set(tag))
- stateDao.upsertState(serializedRow)
+ stateDao.upsertState(serializedRow, value)
}
+ @deprecated(message = "Use the deleteObject overload with revision
instead.", since = "1.0.0")
override def deleteObject(persistenceId: String): Future[Done] =
- stateDao.deleteStateForRevision(persistenceId, 0L)
- .map(_ => Done)(ExecutionContext.parasitic)
+ deleteObject(persistenceId, revision = 0)
+ /**
+ * Delete the value, which will fail with `IllegalStateException` if the
existing stored `revision` + 1 isn't equal to
+ * the given `revision`. This optimistic locking check can be disabled with
configuration `assert-single-writer`. The
+ * stored revision for the persistenceId is updated and next call to
[[getObject]] will return the revision, but with
+ * no value.
+ *
+ * If the given revision is `0` it will fully delete the value and revision
from the database without any optimistic
+ * locking check. Next call to [[getObject]] will then return revision 0 and
no value.
+ */
override def deleteObject(persistenceId: String, revision: Long):
Future[Done] = {
- stateDao.deleteStateForRevision(persistenceId, revision).map { count =>
- if (count != 1) {
- val msg = if (count == 0) {
- s"Failed to delete object with persistenceId [$persistenceId] and
revision [$revision]"
- } else {
- s"Delete object succeeded for persistenceId [$persistenceId] and
revision [$revision] but more than one row was affected ($count rows)"
- }
- // Use DeleteRevisionException if available (Pekko 1.1+), otherwise
fall back to IllegalStateException
- throw
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
- .getOrElse(new IllegalStateException(msg))
- }
- Done
- }(ExecutionContext.parasitic)
+ stateDao.deleteState(persistenceId, revision)
}
override def sliceForPersistenceId(persistenceId: String): Int =
@@ -198,19 +199,28 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
state.copy(rowCount = state.rowCount + 1, latestPid = pid)
def nextQuery(state: PersistenceIdsQueryState): (PersistenceIdsQueryState,
Option[Source[String, NotUsed]]) = {
- if (state.queryCount == 0L || state.rowCount >=
persistenceIdsBufferSize) {
- val newState = state.copy(rowCount = 0, queryCount = state.queryCount
+ 1)
+ def next(newState: PersistenceIdsQueryState) = {
+ val newState2 = newState.copy(rowCount = 0, queryCount =
newState.queryCount + 1)
- if (state.queryCount != 0 && log.isDebugEnabled())
+ if (newState.queryCount != 0 && log.isDebugEnabled())
log.debug(
"persistenceIds query [{}] after [{}]. Found [{}] rows in previous
query.",
- state.queryCount: java.lang.Integer,
- state.latestPid,
- state.rowCount: java.lang.Integer)
+ newState.queryCount: java.lang.Integer,
+ newState.latestPid,
+ newState.rowCount: java.lang.Integer)
- newState -> Some(
+ val afterPid = if (newState.latestPid == "") None else
Some(newState.latestPid)
+
+ newState2 -> Some(
stateDao
- .persistenceIds(if (state.latestPid == "") None else
Some(state.latestPid), persistenceIdsBufferSize))
+ .persistenceIdsFromTable(afterPid, persistenceIdsBufferSize,
newState.tables.head))
+ }
+
+ if (state.queryCount == 0L || state.rowCount >=
persistenceIdsBufferSize) {
+ next(state)
+ } else if (state.tables.tail.nonEmpty) {
+ // continue with next custom table
+ next(state.copy(tables = state.tables.tail, latestPid = ""))
} else {
if (log.isDebugEnabled)
log.debug(
@@ -222,8 +232,11 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
}
}
+ val customTables =
settings.durableStateTableByEntityTypeWithSchema.toList.sortBy(_._1).map(_._2)
+ val tables = settings.durableStateTableWithSchema :: customTables
+
ContinuousQuery[PersistenceIdsQueryState, String](
- initialState = PersistenceIdsQueryState(0, 0, ""),
+ initialState = PersistenceIdsQueryState(0, 0, "", tables),
updateState = updateState,
delayNextQuery = _ => None,
nextQuery = state => nextQuery(state))
diff --git
a/core/src/test/java/org/apache/pekko/persistence/r2dbc/state/JavadslChangeHandler.java
b/core/src/test/java/org/apache/pekko/persistence/r2dbc/state/JavadslChangeHandler.java
new file mode 100644
index 0000000..b1aa91a
--- /dev/null
+++
b/core/src/test/java/org/apache/pekko/persistence/r2dbc/state/JavadslChangeHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.persistence.query.DurableStateChange;
+import org.apache.pekko.persistence.query.UpdatedDurableState;
+import org.apache.pekko.persistence.r2dbc.session.javadsl.R2dbcSession;
+import org.apache.pekko.persistence.r2dbc.state.javadsl.ChangeHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+public class JavadslChangeHandler implements ChangeHandler<String> {
+
+ private final String insertSql;
+
+ public JavadslChangeHandler(ActorSystem<?> system) {
+ String dialect =
system.settings().config().getString("pekko.persistence.r2dbc.dialect");
+ // MySQL uses ? as parameter markers; Postgres/Yugabyte use $1, $2, $3
+ if ("mysql".equals(dialect)) {
+ this.insertSql = "insert into changes_test (pid, rev, value) values (?,
?, ?)";
+ } else {
+ this.insertSql = "insert into changes_test (pid, rev, value) values ($1,
$2, $3)";
+ }
+ }
+
+ @Override
+ public CompletionStage<Done> process(R2dbcSession session,
DurableStateChange<String> change) {
+ if (change instanceof UpdatedDurableState) {
+ UpdatedDurableState<String> upd = (UpdatedDurableState<String>) change;
+ return session
+ .updateOne(
+ session
+ .createStatement(insertSql)
+ .bind(0, upd.persistenceId())
+ .bind(1, upd.revision())
+ .bind(2, upd.value()))
+ .thenApply(n -> Done.getInstance());
+ } else {
+ return CompletableFuture.completedFuture(Done.getInstance());
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/pekko/persistence/r2dbc/state/JavadslColumn.java
b/core/src/test/java/org/apache/pekko/persistence/r2dbc/state/JavadslColumn.java
new file mode 100644
index 0000000..1dc63ce
--- /dev/null
+++
b/core/src/test/java/org/apache/pekko/persistence/r2dbc/state/JavadslColumn.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state;
+
+import org.apache.pekko.persistence.r2dbc.state.javadsl.AdditionalColumn;
+
+public class JavadslColumn extends AdditionalColumn<String, Integer> {
+ @Override
+ public Class<Integer> fieldClass() {
+ return Integer.class;
+ }
+
+ @Override
+ public String columnName() {
+ return "col3";
+ }
+
+ @Override
+ public Binding<Integer> bind(Upsert<String> upsert) {
+ if (upsert.value().isEmpty())
+ return AdditionalColumn.bindNull();
+ else if (upsert.value().equals("SKIP"))
+ return AdditionalColumn.skip();
+ else
+ return new AdditionalColumn.BindValue<>(upsert.value().length());
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
index acd9b65..ad795b6 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
@@ -13,6 +13,7 @@
package org.apache.pekko.persistence.r2dbc.state
+import scala.concurrent.Await
import scala.concurrent.duration._
import org.apache.pekko
@@ -36,7 +37,14 @@ class CurrentPersistenceIdsQuerySpec
extends ScalaTestWithActorTestKit(
ConfigFactory
.parseString("""
- pekko.persistence.r2dbc.query.persistence-ids.buffer-size = 20
+ pekko.persistence.r2dbc {
+ query.persistence-ids.buffer-size = 20
+ state {
+ custom-table {
+ "CustomEntity" = durable_state_test
+ }
+ }
+ }
""")
.withFallback(TestConfig.config))
with AnyWordSpecLike
@@ -60,9 +68,24 @@ class CurrentPersistenceIdsQuerySpec
PersistenceId(entityTypes(entityTypeId - 1), "p" +
zeros.drop(n.toString.length) + n)))
}
+ private val customTable =
stateSettings.getDurableStateTableWithSchema("CustomEntity")
+ private val customEntityType = "CustomEntity"
+ private val customPid1 = nextPid(customEntityType)
+ private val customPid2 = nextPid(customEntityType)
+
override protected def beforeAll(): Unit = {
super.beforeAll()
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll create durable_state_test")(
+ _.createStatement(
+ s"create table if not exists $customTable as select * from
durable_state where persistence_id = ''")),
+ 20.seconds)
+
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete
from $customTable")),
+ 10.seconds)
+
val probe = createTestProbe[Done]()
pids.foreach { pid =>
val persister = spawn(TestActors.DurableStatePersister(pid))
@@ -73,6 +96,18 @@ class CurrentPersistenceIdsQuerySpec
probe.receiveMessages(numberOfPids * 2, 30.seconds) // ack + stop done
}
+ private def createPidsInCustomTable(): Unit = {
+ val probe = createTestProbe[Done]()
+ val persister1 = spawn(TestActors.DurableStatePersister(customPid1))
+ persister1 ! DurableStatePersister.Persist("s-1")
+ persister1 ! DurableStatePersister.Stop(probe.ref)
+ val persister2 = spawn(TestActors.DurableStatePersister(customPid2))
+ persister2 ! DurableStatePersister.Persist("s-1")
+ persister2 ! DurableStatePersister.Stop(probe.ref)
+ probe.expectMessage(Done)
+ probe.expectMessage(Done)
+ }
+
"Durable State persistenceIds" should {
"retrieve all ids" in {
val result = store.currentPersistenceIds().runWith(Sink.seq).futureValue
@@ -102,6 +137,44 @@ class CurrentPersistenceIdsQuerySpec
result shouldBe pids.filter(_.entityTypeHint == entityType).slice(10,
17).map(_.id)
}
+ "include pids from custom table in all ids" in {
+ createPidsInCustomTable()
+ val result = store.currentPersistenceIds().runWith(Sink.seq).futureValue
+ // note that custom tables always come afterwards, i.e. not strictly
sorted on the pids (but that should be ok)
+ result shouldBe (pids.map(_.id) :+ customPid1 :+ customPid2)
+ }
+
+ "include pids from custom table in ids afterId" in {
+ createPidsInCustomTable()
+ val result1 = store.currentPersistenceIds(afterId = Some(pids(9).id),
limit = 1000).runWith(Sink.seq).futureValue
+ // custom pids not included because "CustomEntity" < "TestEntity"
+ result1 shouldBe pids.drop(10).map(_.id)
+
+ val result2 = store.currentPersistenceIds(afterId = Some(customPid1),
limit = 1000).runWith(Sink.seq).futureValue
+ result2 shouldBe customPid2 +: pids.map(_.id)
+ }
+
+ "include pids from custom table in ids for entity type" in {
+ createPidsInCustomTable()
+ val result =
+ store
+ .currentPersistenceIds(entityType = customEntityType, afterId =
None, limit = 30)
+ .runWith(Sink.seq)
+ .futureValue
+ result shouldBe Vector(customPid1, customPid2)
+ }
+
+ "include pids from custom table in ids for entity type after id" in {
+ createPidsInCustomTable()
+ val result =
+ store
+ .currentPersistenceIds(entityType = customEntityType, afterId =
Some(customPid1), limit = 7)
+ .runWith(Sink.seq)
+ .futureValue
+
+ result shouldBe Vector(customPid2)
+ }
+
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala
new file mode 100644
index 0000000..c98f5e9
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state
+
+import java.lang
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn.BindNull
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn.BindValue
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn.Skip
+import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore
+import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.persistence.state.scaladsl.GetObjectResult
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.Outcome
+import org.scalatest.Pending
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object DurableStateStoreAdditionalColumnSpec {
+ val config: Config = ConfigFactory
+ .parseString(s"""
+ pekko.persistence.r2dbc.state {
+ custom-table {
+ "CustomEntity" = durable_state_test
+ }
+ additional-columns {
+ "CustomEntity" = ["${classOf[Column1].getName}",
"${classOf[Column2].getName}", "${classOf[
+ JavadslColumn].getName}"]
+ }
+ }
+ """)
+ .withFallback(TestConfig.config)
+
+ val dialect = config.getString("pekko.persistence.r2dbc.dialect")
+
+ class Column1 extends AdditionalColumn[String, String] {
+ override def columnName: String = "col1"
+
+ override def bind(upsert: AdditionalColumn.Upsert[String]):
AdditionalColumn.Binding[String] =
+ if (upsert.value.isEmpty) BindNull
+ else if (upsert.value == "SKIP") Skip
+ else BindValue(upsert.value)
+ }
+
+ class Column2 extends AdditionalColumn[String, Int] {
+ override def columnName: String = "col2"
+
+ override def bind(upsert: AdditionalColumn.Upsert[String]):
AdditionalColumn.Binding[Int] =
+ if (upsert.value.isEmpty) BindNull
+ else if (upsert.value == "SKIP") Skip
+ else BindValue(upsert.value.length)
+ }
+}
+
+class DurableStateStoreAdditionalColumnSpec
+ extends
ScalaTestWithActorTestKit(DurableStateStoreAdditionalColumnSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ private val customTable =
stateSettings.getDurableStateTableWithSchema("CustomEntity")
+
+ override def typedSystem: ActorSystem[_] = system
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ if (DurableStateStoreAdditionalColumnSpec.dialect != "mysql") {
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll create durable_state_test")(
+ _.createStatement(
+ s"create table if not exists $customTable as select * from
durable_state where persistence_id = ''")),
+ 20.seconds)
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll alter durable_state_test")(
+ _.createStatement(s"alter table $customTable add if not exists col1
varchar(256)")),
+ 20.seconds)
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll alter durable_state_test")(
+ _.createStatement(s"alter table $customTable add if not exists col2
int")),
+ 20.seconds)
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll alter durable_state_test")(
+ _.createStatement(s"alter table $customTable add if not exists col3
int")),
+ 20.seconds)
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete
from $customTable")),
+ 10.seconds)
+ }
+ }
+
+ override def withFixture(test: NoArgTest): Outcome =
+ if (DurableStateStoreAdditionalColumnSpec.dialect == "mysql") {
+ Pending // MySQL doesn't support CREATE TABLE AS SELECT or ALTER TABLE
ADD IF NOT EXISTS
+ } else {
+ super.withFixture(test)
+ }
+
+ private val store = DurableStateStoreRegistry(testKit.system)
+
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
+
+ private val unusedTag = "n/a"
+
+ private def exists(whereCondition: String): Boolean =
+ r2dbcExecutor
+ .selectOne("count")(
+ _.createStatement(s"select count(*) from $customTable where
$whereCondition"),
+ row => row.get(0, classOf[lang.Long]).longValue())
+ .futureValue
+ .contains(1)
+
+ private def existsInCustomTable(persistenceId: String): Boolean =
+ exists(s"persistence_id = '$persistenceId'")
+
+ private def existsMatchingCol1(persistenceId: String, columnValue: String):
Boolean =
+ exists(s"persistence_id = '$persistenceId' and col1 = '$columnValue'")
+
+ private def existsMatchingCol2(persistenceId: String, columnValue: Int):
Boolean =
+ exists(s"persistence_id = '$persistenceId' and col2 = $columnValue")
+
+ private def existsMatchingCol3(persistenceId: String, columnValue: Int):
Boolean =
+ exists(s"persistence_id = '$persistenceId' and col3 = $columnValue")
+
+ private def existsCol1IsNull(persistenceId: String): Boolean =
+ exists(s"persistence_id = '$persistenceId' and col1 is null")
+
+ private def existsCol2IsNull(persistenceId: String): Boolean =
+ exists(s"persistence_id = '$persistenceId' and col2 is null")
+
+ private def existsCol3IsNull(persistenceId: String): Boolean =
+ exists(s"persistence_id = '$persistenceId' and col3 is null")
+
+ "The R2DBC durable state store" should {
+ "save and retrieve a value in custom table with additional columns" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ existsInCustomTable(persistenceId) should be(true)
+ existsMatchingCol1(persistenceId, value) should be(true)
+ existsMatchingCol2(persistenceId, value.length) should be(true)
+ existsMatchingCol3(persistenceId, value.length) should be(true)
+ }
+
+ "update a value in custom table with additional columns" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ val updatedValue = "Open to Feedback"
+ store.upsertObject(persistenceId, 2L, updatedValue,
unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(updatedValue), 2L))
+
+ existsInCustomTable(persistenceId) should be(true)
+ existsMatchingCol1(persistenceId, value) should be(false)
+ existsMatchingCol1(persistenceId, updatedValue) should be(true)
+ existsMatchingCol2(persistenceId, value.length) should be(false)
+ existsMatchingCol2(persistenceId, updatedValue.length) should be(true)
+ existsMatchingCol3(persistenceId, value.length) should be(false)
+ existsMatchingCol3(persistenceId, updatedValue.length) should be(true)
+ }
+
+ "support null binding of additional columns" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val emptyValue = ""
+ store.upsertObject(persistenceId, 1L, emptyValue, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(emptyValue), 1L))
+ existsCol1IsNull(persistenceId) should be(true)
+ existsCol2IsNull(persistenceId) should be(true)
+ existsCol3IsNull(persistenceId) should be(true)
+
+ val updatedValue = "Open to Feedback"
+ store.upsertObject(persistenceId, 2L, updatedValue,
unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(updatedValue), 2L))
+ existsCol1IsNull(persistenceId) should be(false)
+ existsCol2IsNull(persistenceId) should be(false)
+ existsCol3IsNull(persistenceId) should be(false)
+
+ store.upsertObject(persistenceId, 3L, emptyValue, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(emptyValue), 3L))
+ existsCol1IsNull(persistenceId) should be(true)
+ existsCol2IsNull(persistenceId) should be(true)
+ existsCol3IsNull(persistenceId) should be(true)
+ }
+
+ "support skip binding of additional columns" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+ existsMatchingCol1(persistenceId, value) should be(true)
+ existsMatchingCol2(persistenceId, value.length) should be(true)
+ existsMatchingCol3(persistenceId, value.length) should be(true)
+
+ val updatedValue = "SKIP"
+ store.upsertObject(persistenceId, 2L, updatedValue,
unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(updatedValue), 2L))
+ // still same column values
+ existsMatchingCol1(persistenceId, value) should be(true)
+ existsMatchingCol2(persistenceId, value.length) should be(true)
+ existsMatchingCol3(persistenceId, value.length) should be(true)
+ }
+
+ }
+
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala
new file mode 100644
index 0000000..74b954f
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreChangeHandlerSpec.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.state
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.query.DeletedDurableState
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.TestData
+import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.Sql.Interpolation
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.persistence.r2dbc.state.scaladsl.ChangeHandler
+import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore
+import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.persistence.state.scaladsl.GetObjectResult
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+object DurableStateStoreChangeHandlerSpec {
+ val config: Config = ConfigFactory
+ .parseString(s"""
+ pekko.persistence.r2dbc.state {
+ change-handler {
+ "CustomEntity" = "${classOf[Handler].getName}"
+ "JavadslCustomEntity" = "${classOf[JavadslChangeHandler].getName}"
+ }
+ }
+ """)
+ .withFallback(TestConfig.config)
+
+ val dialect: String = config.getString("pekko.persistence.r2dbc.dialect")
+
+ // MySQL uses ? as parameter markers; Postgres/Yugabyte use $1, $2, $3
+ private val insertSql: String =
+ if (dialect == "mysql") "insert into changes_test (pid, rev, value) values
(?, ?, ?)"
+ else sql"insert into changes_test (pid, rev, value) values (?, ?, ?)"
+
+ class Handler(system: ActorSystem[_]) extends ChangeHandler[String] {
+ private implicit val ec: ExecutionContext = system.executionContext
+
+ override def process(session: R2dbcSession, change:
DurableStateChange[String]): Future[Done] = {
+ change match {
+ case upd: UpdatedDurableState[String] =>
+ if (upd.value == "BOOM")
+ Future.failed(new RuntimeException("BOOM"))
+ else
+ session
+ .updateOne(
+ session
+ .createStatement(insertSql)
+ .bind(0, upd.persistenceId)
+ .bind(1, upd.revision)
+ .bind(2, upd.value))
+ .map(_ => Done)
+
+ case del: DeletedDurableState[String] =>
+ session
+ .updateOne(
+ session
+ .createStatement(insertSql)
+ .bind(0, del.persistenceId)
+ .bind(1, del.revision)
+ .bindNull(2, classOf[String]))
+ .map(_ => Done)
+ }
+ }
+ }
+
+}
+
+class DurableStateStoreChangeHandlerSpec
+ extends
ScalaTestWithActorTestKit(DurableStateStoreChangeHandlerSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+
+ private val anotherTable = "changes_test"
+
+ override def typedSystem: ActorSystem[_] = system
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll create changes_test")(
+ _.createStatement(
+ s"create table if not exists $anotherTable (pid varchar(256), rev
bigint, value varchar(256))")),
+ 20.seconds)
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(_.createStatement(s"delete
from $anotherTable")),
+ 10.seconds)
+ }
+
+ private val store = DurableStateStoreRegistry(testKit.system)
+
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
+
+ private val unusedTag = "n/a"
+
+ private def exists(whereCondition: String): Boolean =
+ r2dbcExecutor
+ .selectOne("count")(
+ _.createStatement(s"select count(*) from $anotherTable where
$whereCondition"),
+ row => row.get(0, classOf[java.lang.Long]).longValue())
+ .futureValue
+ .contains(1)
+
+ "The R2DBC durable state store change handler" should {
+ "be invoked for first revision" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ exists(s"pid = '$persistenceId' and rev = 1 and value = '$value'")
should be(true)
+ }
+
+ "be invoked for updates" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ val updatedValue = "Open to Feedback"
+ store.upsertObject(persistenceId, 2L, updatedValue,
unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(updatedValue), 2L))
+
+ exists(s"pid = '$persistenceId' and rev = 2 and value =
'$updatedValue'") should be(true)
+ }
+
+ "be invoked for deletes" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ store.deleteObject(persistenceId, 2L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 2L))
+
+ exists(s"pid = '$persistenceId' and rev = 2 and value is null") should
be(true)
+ }
+
+ "be invoked for hard deletes" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ // revision 0 is for hard delete
+ store.deleteObject(persistenceId, 0L).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(None, 0L))
+
+ exists(s"pid = '$persistenceId' and rev = 0 and value is null") should
be(true)
+ }
+
+ "use same transaction" in {
+ val entityType = "CustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Genuinely Collaborative"
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ val updatedValue = "BOOM"
+ store.upsertObject(persistenceId, 2L, updatedValue,
unusedTag).failed.futureValue.getMessage should be(
+ s"Change handler update failed for [$persistenceId] revision [2], due
to BOOM")
+ // still old value
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ exists(s"pid = '$persistenceId' and rev = 2") should be(false)
+ }
+
+ "support javadsl.ChangeHandler" in {
+ val entityType = "JavadslCustomEntity"
+ val persistenceId = nextPid(entityType)
+ val value = "Run anywhere"
+
+ store.upsertObject(persistenceId, 1L, value, unusedTag).futureValue
+ store.getObject(persistenceId).futureValue should
be(GetObjectResult(Some(value), 1L))
+
+ exists(s"pid = '$persistenceId' and rev = 1 and value = '$value'")
should be(true)
+ }
+
+ }
+
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
index 991edea..302b60c 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
@@ -17,7 +17,7 @@ import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl.{ LogCapturing,
ScalaTestWithActorTestKit }
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.{ TestConfig, TestData, TestDbLifecycle }
-import pekko.persistence.r2dbc.state.scaladsl.{ DurableStateExceptionSupport,
R2dbcDurableStateStore }
+import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.persistence.state.scaladsl.GetObjectResult
import pekko.persistence.typed.PersistenceId
@@ -248,7 +248,7 @@ class DurableStateStoreSpec
val failure =
store.deleteObject(persistenceId.id, revision = 2L).failed.futureValue
failure.getMessage should include(
- s"Failed to delete object with persistenceId [${persistenceId.id}] and
revision [2]")
+ s"Delete failed: durable state for persistence id
[${persistenceId.id}] could not be updated to revision [2]")
}
}
diff --git a/docs/src/main/paradox/projection.md
b/docs/src/main/paradox/projection.md
index 9caf0da..b9eb5e6 100644
--- a/docs/src/main/paradox/projection.md
+++ b/docs/src/main/paradox/projection.md
@@ -8,7 +8,7 @@ The source of the envelopes is from a `SourceProvider`, which
can be:
* state changes for Durable State entities via the @extref:[SourceProvider for
changesBySlices](pekko-projection:durable-state.html#sourceprovider-for-changesbyslices)
with the @ref:[changesBySlices query](query.md#changesbyslices)
* any other `SourceProvider` with supported @ref:[offset types](#offset-types)
-A @apidoc[R2dbcHandler] receives a @apidoc[R2dbcSession] instance and an
envelope. The
+A @apidoc[R2dbcHandler] receives a
@apidoc[org.apache.pekko.projection.r2dbc.*.R2dbcSession] instance and an
envelope. The
`R2dbcSession` provides the means to access an open R2DBC connection that can
be used to process the envelope.
The target database operations can be run in the same transaction as the
storage of the offset, which means
that @ref:[exactly-once](#exactly-once) processing semantics is supported. It
also offers
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]