This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 64e07dcf1b new durable state persistence exceptions (#1271)
64e07dcf1b is described below
commit 64e07dcf1b1160ef04f303d1e22c599a9bdc783f
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 22 21:17:10 2024 +0200
new durable state persistence exceptions (#1271)
* new persistence exceptions
typos
scalafmt
add annotation
refactor due to review comments
Update DurableStateExceptionSpec.scala
Delete PersistenceException.scala
refactor
Update DurableStateStoreException.scala
refactor again
* refactor again
* Update DurableStateStoreException.scala
* more generic exception
* add javadoc
---
.../state/internal/DurableStateBehaviorImpl.scala | 3 +-
.../internal/DurableStateStoreException.scala | 3 +-
.../state/exception/DurableStateException.scala | 42 ++++++++++++++++
.../state/javadsl/DurableStateUpdateStore.scala | 29 ++++++++++-
.../state/scaladsl/DurableStateUpdateStore.scala | 29 ++++++++++-
.../exception/DurableStateExceptionsSpec.scala | 57 ++++++++++++++++++++++
6 files changed, 159 insertions(+), 4 deletions(-)
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
index 93cf062764..b87589e68f 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateBehaviorImpl.scala
@@ -30,6 +30,7 @@ import pekko.actor.typed.scaladsl.Behaviors
import pekko.annotation._
import pekko.persistence.RecoveryPermitter
import pekko.persistence.typed.state.scaladsl._
+import pekko.persistence.state.exception.DurableStateException
import pekko.persistence.state.scaladsl.GetObjectResult
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.SnapshotAdapter
@@ -153,7 +154,7 @@ private[pekko] final case class
DurableStateBehaviorImpl[Command, State](
}
}
- .onFailure[DurableStateStoreException](supervisionStrategy)
+ .onFailure[DurableStateException](supervisionStrategy)
}
@InternalStableApi
diff --git
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala
index 3a75d03a4a..5f0dcfbcf1 100644
---
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala
+++
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/state/internal/DurableStateStoreException.scala
@@ -15,6 +15,7 @@ package org.apache.pekko.persistence.typed.state.internal
import org.apache.pekko
import pekko.annotation.InternalApi
+import pekko.persistence.state.exception.DurableStateException
import pekko.persistence.typed.PersistenceId
/**
@@ -24,7 +25,7 @@ import pekko.persistence.typed.PersistenceId
*/
@InternalApi
final private[pekko] class DurableStateStoreException(msg: String, cause:
Throwable)
- extends RuntimeException(msg, cause) {
+ extends DurableStateException(msg, cause) {
def this(persistenceId: PersistenceId, sequenceNr: Long, cause: Throwable) =
this(s"Failed to persist state with sequence number [$sequenceNr] for
persistenceId [${persistenceId.id}]", cause)
}
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/state/exception/DurableStateException.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/state/exception/DurableStateException.scala
new file mode 100644
index 0000000000..d1908bc742
--- /dev/null
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/state/exception/DurableStateException.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.state.exception
+
+import scala.util.control.NoStackTrace
+
+/**
+ * Exception thrown when Durable State cannot be updated or deleted.
+ *
+ * @param msg the exception message
+ * @param cause the exception cause
+ * @since 1.1.0
+ */
+abstract class DurableStateException(msg: String, cause: Throwable)
+ extends RuntimeException(msg, cause) {
+ def this(msg: String) = this(msg, null)
+}
+
+/**
+ * Exception thrown when Durable State revision cannot be deleted.
+ * The revision could be out of date.
+ *
+ * @param msg the exception message
+ * @since 1.1.0
+ */
+final class DeleteRevisionException(msg: String)
+ extends DurableStateException(msg) with NoStackTrace
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala
index ca2a5c69c1..82b5ba8ef8 100644
---
a/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/state/javadsl/DurableStateUpdateStore.scala
@@ -26,12 +26,39 @@ import pekko.Done
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
/**
- * @param seqNr sequence number for optimistic locking. starts at 1.
+ * Upsert the object with the given `persistenceId` and `revision`.
+ *
+ * @param persistenceId the persistenceId of the object to upsert
+ * @param revision the revision of the object to upsert
+ * @param value the value to upsert
+ * @param tag a tag to associate with the object
+ * @return a CompletionStage that completes when the object has been upserted
*/
def upsertObject(persistenceId: String, revision: Long, value: A, tag:
String): CompletionStage[Done]
+ /**
+ * Delete the object with the given `persistenceId`. This deprecated
+ * function ignores whether the object is deleted or not.
+ *
+ * @param persistenceId the persistenceId of the object to delete
+ * @param revision the revision of the object to delete
+ * @return a CompletionStage that completes when the object has been deleted
+ */
@deprecated(message = "Use the deleteObject overload with revision
instead.", since = "Akka 2.6.20")
def deleteObject(persistenceId: String): CompletionStage[Done]
+ /**
+ * Delete the object with the given `persistenceId` and `revision`.
+ *
+ * <p>
+ * Since Pekko v1.1, if the revision does not match the current revision
+ * of the object, the delete operation will fail. The returned
CompletionStage
+ * will complete with a failed result wrapping the exception.
+ * </p>
+ *
+ * @param persistenceId the persistenceId of the object to delete
+ * @param revision the revision of the object to delete
+ * @return a CompletionStage that completes when the object has been deleted
+ */
def deleteObject(persistenceId: String, revision: Long):
CompletionStage[Done]
}
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala
index 0d44597421..be38e4d885 100644
---
a/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/state/scaladsl/DurableStateUpdateStore.scala
@@ -26,12 +26,39 @@ import pekko.Done
trait DurableStateUpdateStore[A] extends DurableStateStore[A] {
/**
- * @param seqNr sequence number for optimistic locking. starts at 1.
+ * Upsert the object with the given `persistenceId` and `revision`.
+ *
+ * @param persistenceId the persistenceId of the object to upsert
+ * @param revision the revision of the object to upsert
+ * @param value the value to upsert
+ * @param tag a tag to associate with the object
+ * @return a Future that completes when the object has been upserted
*/
def upsertObject(persistenceId: String, revision: Long, value: A, tag:
String): Future[Done]
+ /**
+ * Delete the object with the given `persistenceId`. This deprecated
+ * function ignores whether the object is deleted or not.
+ *
+ * @param persistenceId the persistenceId of the object to delete
+ * @param revision the revision of the object to delete
+ * @return a Future that completes when the object has been deleted
+ */
@deprecated(message = "Use the deleteObject overload with revision
instead.", since = "Akka 2.6.20")
def deleteObject(persistenceId: String): Future[Done]
+ /**
+ * Delete the object with the given `persistenceId` and `revision`.
+ *
+ * <p>
+ * Since Pekko v1.1, if the revision does not match the current revision
+ * of the object, the delete operation will fail. The returned Future
+ * will complete with a failed result wrapping the exception.
+ * </p>
+ *
+ * @param persistenceId the persistenceId of the object to delete
+ * @param revision the revision of the object to delete
+ * @return a Future that completes when the object has been deleted
+ */
def deleteObject(persistenceId: String, revision: Long): Future[Done]
}
diff --git
a/persistence/src/test/scala/org/apache/pekko/persistence/state/exception/DurableStateExceptionsSpec.scala
b/persistence/src/test/scala/org/apache/pekko/persistence/state/exception/DurableStateExceptionsSpec.scala
new file mode 100644
index 0000000000..0d6dcd7a54
--- /dev/null
+++
b/persistence/src/test/scala/org/apache/pekko/persistence/state/exception/DurableStateExceptionsSpec.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.state.exception
+
+import java.lang.invoke.{ MethodHandles, MethodType }
+
+import scala.util.Try
+import scala.util.control.NoStackTrace
+
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.scalatest.matchers.should.Matchers
+
+/**
+ * Tests for [[DurableStateException]]s.
+ * <p>
+ * To avoid making Pekko persistence implementations dependent on
+ * Pekko v1.1, these exceptions will be created using MethodHandles.
+ * </p>
+ */
+class DurableStateExceptionsSpec extends AnyWordSpecLike
+ with Matchers {
+
+ private val methodHandleLookup = MethodHandles.publicLookup()
+
+ "DurableStateException support" must {
+ "allow creating DeleteRevisionException using MethodHandle" in {
+ val exceptionClassOpt: Option[Class[_]] = Try(Class.forName(
+
"org.apache.pekko.persistence.state.exception.DeleteRevisionException")).toOption
+ exceptionClassOpt should not be empty
+ val constructorOpt = exceptionClassOpt.map { clz =>
+ val mt = MethodType.methodType(classOf[Unit], classOf[String])
+ methodHandleLookup.findConstructor(clz, mt)
+ }
+ constructorOpt should not be empty
+ val constructor = constructorOpt.get
+ val ex = constructor.invoke("delete failed").asInstanceOf[Exception]
+ ex shouldBe an[DeleteRevisionException]
+ ex shouldBe an[NoStackTrace]
+ ex.getMessage shouldEqual "delete failed"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]