This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new e6625a5 bring docs up to date with akka-persistence-r2dbc 1.1.0 (#387)
e6625a5 is described below
commit e6625a540257aa8b4e68eb2237b0dba7ee7db7ce
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 23 12:08:50 2026 +0100
bring docs up to date with akka-persistence-r2dbc 1.1.0 (#387)
* docs: add missing files and sections from akka-persistence-r2dbc 1.1.x
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/cd0c2f8f-6f30-4593-8bd9-3b92c0f0a983
Co-authored-by: pjfanning <[email protected]>
* refactor
* scalafmt
* Update Dependencies.scala
* refactor
* Update BlogPostJsonColumn.java
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
docs/src/main/paradox/config.md | 52 +++++
docs/src/main/paradox/durable-state-store.md | 138 +++++++++++++-
docs/src/main/paradox/getting-started.md | 3 +
docs/src/main/paradox/index.md | 1 +
docs/src/main/paradox/journal.md | 16 ++
docs/src/main/paradox/postgres_json.md | 28 +++
docs/src/main/paradox/snapshots.md | 11 ++
.../java/jdocs/home/MultiPluginDocExample.java | 68 +++++++
docs/src/test/java/jdocs/home/state/BlogPost.java | 211 +++++++++++++++++++++
.../test/java/jdocs/home/state/BlogPostCounts.java | 84 ++++++++
.../java/jdocs/home/state/BlogPostJsonColumn.java | 49 +++++
.../test/java/jdocs/home/state/BlogPostQuery.java | 52 +++++
.../java/jdocs/home/state/BlogPostTitleColumn.java | 42 ++++
.../docs/home/query/QueryDocCompileOnly.scala | 8 +-
docs/src/test/scala/docs/home/state/BlogPost.scala | 116 +++++++++++
.../scala/docs/home/state/BlogPostCounts.scala | 84 ++++++++
.../scala/docs/home/state/BlogPostJsonColumn.scala | 41 ++++
.../test/scala/docs/home/state/BlogPostQuery.scala | 47 +++++
.../docs/home/state/BlogPostTitleColumn.scala | 50 +++++
project/Dependencies.scala | 3 +-
20 files changed, 1099 insertions(+), 5 deletions(-)
diff --git a/docs/src/main/paradox/config.md b/docs/src/main/paradox/config.md
index b6da7ae..0a51eb4 100644
--- a/docs/src/main/paradox/config.md
+++ b/docs/src/main/paradox/config.md
@@ -20,6 +20,58 @@ The following configuration can be overridden in your
`application.conf`:
@@snip [reference.conf](/core/src/main/resources/reference.conf)
{#connection-settings}
+## Journal configuration
+
+Journal configuration properties are by default defined under
`pekko.persistence.r2dbc.journal`.
+
+See @ref:[Journal plugin configuration](journal.md#configuration).
+
+## Snapshot configuration
+
+Snapshot store configuration properties are by default defined under
`pekko.persistence.r2dbc.snapshot`.
+
+See @ref:[Snapshot store plugin configuration](snapshots.md#configuration).
+
+## Durable state configuration
+
+Durable state store configuration properties are by default defined under
`pekko.persistence.r2dbc.state`.
+
+See @ref:[Durable state plugin
configuration](durable-state-store.md#configuration).
+
+## Query configuration
+
+Query configuration properties are by default defined under
`pekko.persistence.r2dbc.query`.
+
+See @ref:[Query plugin configuration](query.md#configuration).
+
+## Multiple plugins
+
+To enable the plugins to be used by default, add the following lines to your
Pekko `application.conf`:
+
+@@snip
[application.conf](/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala)
{#default-config}
+
+Note that all plugins have a shared root config section
`pekko.persistence.r2dbc`, which also contains the
+@ref:[Connection configuration](#connection-configuration) for the connection
pool that is shared for the plugins.
+
+You can use additional plugins with different configuration. For example if
more than one database is used. Then you would define the configuration
+such as:
+
+@@snip
[application.conf](/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala)
{#second-config}
+
+To use the additional plugin you would @scala[define]@java[override] the
plugin id.
+
+Scala
+: @@snip
[MultiPluginDocExample.scala](/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala){#withPlugins}
+
+Java
+: @@snip
[MultiPluginDocExample.java](/docs/src/test/java/jdocs/home/MultiPluginDocExample.java)
{#withPlugins}
+
+It is similar for `DurableStateBehavior`, @scala[define
`withDurableStateStorePluginId("second-r2dbc.state")`]
+@java[override `durableStateStorePluginId` with `"second-r2dbc.state"`].
+
+For queries and Projection `SourceProvider` you would use
`"second-r2dbc.query"` instead of the default
@scala[`R2dbcReadJournal.Identifier`]
+@java[`R2dbcReadJournal.Identifier()`] (`"pekko.persistence.r2dbc.query"`).
+
## Plugin configuration at runtime
Plugin implementation supports plugin configuration at runtime.
diff --git a/docs/src/main/paradox/durable-state-store.md
b/docs/src/main/paradox/durable-state-store.md
index 397413f..0316a24 100644
--- a/docs/src/main/paradox/durable-state-store.md
+++ b/docs/src/main/paradox/durable-state-store.md
@@ -29,5 +29,139 @@ The following can be overridden in your `application.conf`
for the journal speci
## Deletes
-The journal supports deletes through hard deletes, which means the durable
state store entries are actually deleted from the database.
-There is no materialized view with a copy of the state so make sure to not
delete durable states too early if they are used from projections or queries.
+The store supports deletes through hard deletes, which means the durable state
store entries are actually deleted from
+the database. There is no materialized view with a copy of the state so make
sure to not delete durable states too early
+if they are used from projections or queries.
+
+For each persistent id one tombstone record is kept in the store when the
state of the persistence id has been deleted.
+The reason for the tombstone record is to keep track of the latest revision
number so that subsequent state changes
+don't reuse the same revision numbers that have been deleted.
+
+See the @ref[DurableStateCleanup tool](cleanup.md#durable-state-cleanup-tool)
for more information about how to delete
+state tombstone records.
+
+## State serialization
+
+The state is serialized with @extref:[Pekko
Serialization](pekko:serialization.html) and the binary representation
+is stored in the `state_payload` column together with information about what
serializer that was used in the
+`state_ser_id` and `state_ser_manifest` columns.
+
+For PostgreSQL the payload is stored as `BYTEA` type. Alternatively, you can
use `JSONB` column type as described in
+@ref:[PostgreSQL JSON](postgres_json.md).
+
+## Storing query representation
+
+@extref:[Durable state actors](pekko:typed/durable-state/persistence.html) can
only be looked up by their entity id.
+Additional indexed data can be stored as a query representation. You can
either store the query representation from
+an asynchronous @ref:[Projection](projection.md) or you can store it in the
same transaction as the Durable State
+upsert or delete.
+
+Advantages of storing the query representation in the same transaction as the
Durable State change:
+
+* exactly-once processing and atomic update with the Durable State change
+* no eventual consistency delay from asynchronous Projection processing
+* no need for Projection offset storage
+
+That said, for write heavy Durable State, a Projection can have the advantage
of not impacting write latency of
+the Durable State updates. Note that updating the secondary index also has an
impact on the write latency.
+
+### Additional columns
+
+In many cases you just want a secondary index on one or a few fields other
than the entity id. For that purpose
+you can configure one or more @apidoc[AdditionalColumn] classes for an entity
type. The `AdditionalColumn` will
+extract the field from the Durable State value and define how to bind it to a
database column.
+
+The configuration:
+
+@@snip
[application.conf](/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala)
{ #additional-column-config }
+
+For each entity type you can define a list of fully qualified class names of
`AdditionalColumn` implementations.
+The `AdditionalColumn` implementation may optionally define an ActorSystem
constructor parameter.
+
+`AdditionalColumn` for a secondary index on the title of @extref:[blog posts
(example in the Pekko
documentation)](pekko:typed/durable-state/persistence.html#changing-behavior):
+
+Scala
+: @@snip
[BlogPostTitleColumn.scala](/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala)
{ #additional-column }
+
+Java
+: @@snip
[BlogPostTitleColumn.java](/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java)
{ #additional-column }
+
+From the `bind` method you can return one of:
+
+* @scala[`AdditionalColumn.BindValue`]@java[`AdditionalColumn.bindValue`] -
bind a value such as a `String` or `Long` to the database column
+* @scala[`AdditionalColumn.BindNull`]@java[`AdditionalColumn.bindNull`] -
store `null` in the database column
+* @scala[`AdditionalColumn.Skip`]@java[`AdditionalColumn.skip`] - don't update
the database column for this change, keep existing value
+
+You would have to add the additional columns to the `durable_state` table
definition, and create secondary database index.
+Unless you only have one entity type it's best to define a separate table with
the `custom-table` configuration, see
+example above. The full serialized state and the additional columns are stored
in the custom table instead of the
+default `durable_state` table. The custom table should have the same table
definition as the default
+`durable_state` table but with the extra columns added.
+
+The state can be found by the additional column and deserialized like this:
+
+Scala
+: @@snip
[BlogPostQuery.scala](/docs/src/test/scala/docs/home/state/BlogPostQuery.scala)
{ #query }
+
+Java
+: @@snip
[BlogPostQuery.java](/docs/src/test/java/jdocs/home/state/BlogPostQuery.java) {
#query }
+
+#### Additional column as PostgreSQL JSON
+
+With PostgreSQL the additional column type can be `JSONB` to take advantage of
PostgreSQL support for [JSON
Types](https://www.postgresql.org/docs/current/datatype-json.html).
+
+Then you would wrap the string or byte array representation of the JSON in
`io.r2dbc.postgresql.codec.Json` when
+binding the value.
+
+Scala
+: @@snip
[BlogPostJsonColumn.scala](/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala)
{ #additional-column-json }
+
+Java
+: @@snip
[BlogPostJsonColumn.java](/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java)
{ #additional-column-json }
+
+### Change handler
+
+For more advanced cases where the query representation would not fit in
@ref:[additional columns](#additional-columns)
+you can configure a @apidoc[ChangeHandler] for an entity type. The
`ChangeHandler` will be invoked for each
+Durable State change. From the `ChangeHandler` you can run database operations
in the same transaction
+as the Durable State upsert or delete.
+
+The configuration:
+
+@@snip
[application.conf](/docs/src/test/scala/docs/home/state/BlogPostCounts.scala) {
#change-handler-config }
+
+For each entity type you can define the fully qualified class name of a
`ChangeHandler` implementation.
+The `ChangeHandler` implementation may optionally define an ActorSystem
constructor parameter.
+
+`ChangeHandler` for keeping track of number of published @extref:[blog posts
(example in the Pekko
documentation)](pekko:typed/durable-state/persistence.html#changing-behavior):
+
+Scala
+: @@snip
[BlogPostCounts.scala](/docs/src/test/scala/docs/home/state/BlogPostCounts.scala)
{ #change-handler }
+
+Java
+: @@snip
[BlogPostCounts.java](/docs/src/test/java/jdocs/home/state/BlogPostCounts.java)
{ #change-handler }
+
+The @apidoc[DurableStateChange] parameter is an @apidoc[UpdatedDurableState]
when the Durable State is created or updated.
+It is a @apidoc[DeletedDurableState] when the Durable State is deleted.
+
+The @apidoc[org.apache.pekko.persistence.r2dbc.session.*.R2dbcSession]
provides the means to access an open R2DBC connection
+that can be used to process the change. The target database operations run in
the same transaction as the storage
+of the Durable State change.
+
+One change is processed at a time. It will not be invoked with the next change
until after the `process` method returns
+and the returned @scala[`Future`]@java[`CompletionStage`] is completed.
+
+@@@ note { title="Concurrency semantics" }
+
+The `ChangeHandler` should be implemented as a stateless function without
mutable state because the same
+`ChangeHandler` instance may be invoked concurrently for different entities.
+For a specific entity (persistenceId) one change is processed at a time and
the `process` method will not be
+invoked with the next change for that entity until after the returned
@scala[`Future`]@java[`CompletionStage`] is completed.
+
+@@@
+
+### PostgreSQL JSON payload
+
+For PostgreSQL, an alternative to defining additional columns or change
handlers can be to store the state as JSON
+as described in @ref:[PostgreSQL JSON](postgres_json.md). Then you can add
[secondary jsonb
indexes](https://www.postgresql.org/docs/current/datatype-json.html#JSON-INDEXING)
+on the payload content for queries.
diff --git a/docs/src/main/paradox/getting-started.md
b/docs/src/main/paradox/getting-started.md
index 21bc7a8..bdafe80 100644
--- a/docs/src/main/paradox/getting-started.md
+++ b/docs/src/main/paradox/getting-started.md
@@ -61,6 +61,9 @@ Tables and indexes:
Postgres:
: @@snip [create_tables.sql](/ddl-scripts/create_tables_postgres.sql)
+Postgres JSONB:
+: @@snip [create_tables.sql](/ddl-scripts/create_tables_postgres_jsonb.sql)
+
Yugabyte:
: @@snip [create_tables.sql](/ddl-scripts/create_tables_yugabyte.sql)
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 2d94f93..629d0c2 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -14,6 +14,7 @@ The Pekko Persistence R2DBC plugin allows for using SQL
database with R2DBC as a
* [Snapshot Plugin](snapshots.md)
* [Durable State Plugin](durable-state-store.md)
* [Query Plugin](query.md)
+* [PostgreSQL JSON](postgres_json.md)
* [Projection](projection.md)
* [Cleanup Tool](cleanup.md)
* [Migration Tool](migration.md)
diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md
index 1b76ea8..c224a07 100644
--- a/docs/src/main/paradox/journal.md
+++ b/docs/src/main/paradox/journal.md
@@ -35,3 +35,19 @@ The following can be overridden in your `application.conf`
for the journal speci
The journal supports deletes through hard deletes, which means the journal
entries are actually deleted from the database.
There is no materialized view with a copy of the event so make sure to not
delete events too early if they are used from projections or queries.
+
+For each persistent id one tombstone record is kept in the event journal when
all events of a persistence id have been
+deleted. The reason for the tombstone record is to keep track of the latest
sequence number so that subsequent events
+don't reuse the same sequence numbers that have been deleted.
+
+See the @ref[EventSourcedCleanup tool](cleanup.md#event-sourced-cleanup-tool)
for more information about how to delete
+events, snapshots and tombstone records.
+
+## Event serialization
+
+The events are serialized with @extref:[Pekko
Serialization](pekko:serialization.html) and the binary representation
+is stored in the `event_payload` column together with information about what
serializer that was used in the
+`event_ser_id` and `event_ser_manifest` columns.
+
+For PostgreSQL the payload is stored as `BYTEA` type. Alternatively, you can
use `JSONB` column type as described in
+@ref:[PostgreSQL JSON](postgres_json.md).
diff --git a/docs/src/main/paradox/postgres_json.md
b/docs/src/main/paradox/postgres_json.md
new file mode 100644
index 0000000..f4d7060
--- /dev/null
+++ b/docs/src/main/paradox/postgres_json.md
@@ -0,0 +1,28 @@
+# PostgreSQL JSON
+
+By default, the serialized event, snapshot and durable state payloads, are
stored in `BYTEA` columns. Alternatively,
+you can use `JSONB` column type to take advantage of PostgreSQL support for
[JSON Types](https://www.postgresql.org/docs/current/datatype-json.html).
+For example, then you can add secondary jsonb indexes on the payload content
for queries.
+
+To enable `JSONB` payloads you need the following.
+
+1. Create the schema as shown in the Postgres JSONB tab in @ref:[Creating the
schema](getting-started.md#schema).
+
+1. Define configuration:
+ ```
+ pekko.persistence.r2dbc {
+ journal.payload-column-type = JSONB
+ snapshot.payload-column-type = JSONB
+ state.payload-column-type = JSONB
+ }
+ ```
+
+1. Serialize the event, snapshot and durable state payloads as JSON bytes.
+
+For the serialization you can use:
+* @extref:[Pekko Serialization with Jackson](pekko:serialization-jackson.html)
with JSON format.
+ * Make sure to disable
@extref:[compression](pekko:serialization-jackson.html#compression) with
`pekko.serialization.jackson.jackson-json.compression.algorithm = off`
+* Plain strings in JSON format.
+* A custom Pekko serializer that uses a binary format as UTF-8 encoded JSON
string.
+
+Note that you can enable this feature selectively for the event journal,
snapshot, and durable state.
diff --git a/docs/src/main/paradox/snapshots.md
b/docs/src/main/paradox/snapshots.md
index 4e1f1ab..b565197 100644
--- a/docs/src/main/paradox/snapshots.md
+++ b/docs/src/main/paradox/snapshots.md
@@ -37,3 +37,14 @@ If a `keepNSnapshots > 1` is specified for an
`EventSourcedBehavior` that settin
The reason for this is that there is no real benefit to keep multiple
snapshots around on a relational
database with a high consistency.
+
+See also @ref[EventSourcedCleanup tool](cleanup.md#event-sourced-cleanup-tool).
+
+## Snapshot serialization
+
+The state is serialized with @extref:[Pekko
Serialization](pekko:serialization.html) and the binary snapshot representation
+is stored in the `snapshot` column together with information about what
serializer that was used in the
+`ser_id` and `ser_manifest` columns.
+
+For PostgreSQL the payload is stored as `BYTEA` type. Alternatively, you can
use `JSONB` column type as described in
+@ref:[PostgreSQL JSON](postgres_json.md).
diff --git a/docs/src/test/java/jdocs/home/MultiPluginDocExample.java
b/docs/src/test/java/jdocs/home/MultiPluginDocExample.java
new file mode 100644
index 0000000..6210cf0
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/MultiPluginDocExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home;
+
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.apache.pekko.persistence.typed.javadsl.CommandHandler;
+import org.apache.pekko.persistence.typed.javadsl.EventHandler;
+import org.apache.pekko.persistence.typed.javadsl.EventSourcedBehavior;
+
+public class MultiPluginDocExample {
+
+ static
+ // #withPlugins
+ public class MyEntity extends EventSourcedBehavior<MyEntity.Command,
MyEntity.Event, MyEntity.State> {
+ // #withPlugins
+ public MyEntity(PersistenceId persistenceId) {
+ super(persistenceId);
+ }
+
+ interface Command {
+ }
+
+ interface Event {
+ }
+
+ static class State {
+ }
+
+ @Override
+ public State emptyState() {
+ return new State();
+ }
+
+ @Override
+ public CommandHandler<Command, Event, State> commandHandler() {
+ return newCommandHandlerBuilder().build();
+ }
+
+ @Override
+ public EventHandler<State, Event> eventHandler() {
+ return newEventHandlerBuilder().build();
+ }
+
+ // #withPlugins
+ @Override
+ public String journalPluginId() {
+ return "second-r2dbc.journal";
+ }
+
+ @Override
+ public String snapshotPluginId() {
+ return "second-r2dbc.snapshot";
+ }
+ }
+ // #withPlugins
+
+}
diff --git a/docs/src/test/java/jdocs/home/state/BlogPost.java
b/docs/src/test/java/jdocs/home/state/BlogPost.java
new file mode 100644
index 0000000..f896340
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPost.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorRef;
+import org.apache.pekko.actor.typed.Behavior;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.apache.pekko.persistence.typed.state.javadsl.CommandHandler;
+import org.apache.pekko.persistence.typed.state.javadsl.CommandHandlerBuilder;
+import org.apache.pekko.persistence.typed.state.javadsl.DurableStateBehavior;
+import org.apache.pekko.persistence.typed.state.javadsl.Effect;
+
+public class BlogPost
+ extends DurableStateBehavior<
+ BlogPost.Command, BlogPost.State> {
+ // commands and state as in above snippets
+
+ interface State {}
+
+ enum BlankState implements State {
+ INSTANCE
+ }
+
+ static class DraftState implements State {
+ final PostContent content;
+
+ DraftState(PostContent content) {
+ this.content = content;
+ }
+
+ DraftState withContent(PostContent newContent) {
+ return new DraftState(newContent);
+ }
+
+ DraftState withBody(String newBody) {
+ return withContent(new PostContent(postId(), content.title, newBody));
+ }
+
+ String postId() {
+ return content.postId;
+ }
+ }
+
+ static class PublishedState implements State {
+ final PostContent content;
+
+ PublishedState(PostContent content) {
+ this.content = content;
+ }
+
+ PublishedState withContent(PostContent newContent) {
+ return new PublishedState(newContent);
+ }
+
+ PublishedState withBody(String newBody) {
+ return withContent(new PostContent(postId(), content.title, newBody));
+ }
+
+ String postId() {
+ return content.postId;
+ }
+ }
+
+ public interface Command {}
+ public static class AddPost implements Command {
+ final PostContent content;
+ final ActorRef<AddPostDone> replyTo;
+
+ public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
+ this.content = content;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static class AddPostDone implements Command {
+ final String postId;
+
+ public AddPostDone(String postId) {
+ this.postId = postId;
+ }
+ }
+ public static class GetPost implements Command {
+ final ActorRef<PostContent> replyTo;
+
+ public GetPost(ActorRef<PostContent> replyTo) {
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static class ChangeBody implements Command {
+ final String newBody;
+ final ActorRef<Done> replyTo;
+
+ public ChangeBody(String newBody, ActorRef<Done> replyTo) {
+ this.newBody = newBody;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static class Publish implements Command {
+ final ActorRef<Done> replyTo;
+
+ public Publish(ActorRef<Done> replyTo) {
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static class PostContent implements Command {
+ final String postId;
+ final String title;
+ final String body;
+
+ public PostContent(String postId, String title, String body) {
+ this.postId = postId;
+ this.title = title;
+ this.body = body;
+ }
+ }
+
+ public static Behavior<Command> create(String entityId, PersistenceId
persistenceId) {
+ return Behaviors.setup(
+ context -> {
+ context.getLog().info("Starting BlogPostEntityDurableState {}",
entityId);
+ return new BlogPost(persistenceId);
+ });
+ }
+
+ private BlogPost(PersistenceId persistenceId) {
+ super(persistenceId);
+ }
+
+ @Override
+ public State emptyState() {
+ return BlankState.INSTANCE;
+ }
+
+ @Override
+ public CommandHandler<Command, State> commandHandler() {
+ CommandHandlerBuilder<Command, State> builder = newCommandHandlerBuilder();
+
+ builder.forStateType(BlankState.class).onCommand(AddPost.class,
this::onAddPost);
+
+ builder
+ .forStateType(DraftState.class)
+ .onCommand(ChangeBody.class, this::onChangeBody)
+ .onCommand(Publish.class, this::onPublish)
+ .onCommand(GetPost.class, this::onGetPost);
+
+ builder
+ .forStateType(PublishedState.class)
+ .onCommand(ChangeBody.class, this::onChangeBody)
+ .onCommand(GetPost.class, this::onGetPost);
+
+ builder.forAnyState().onCommand(AddPost.class, (state, cmd) ->
Effect().unhandled());
+
+ return builder.build();
+ }
+
+ private Effect<State> onAddPost(AddPost cmd) {
+ return Effect()
+ .persist(new DraftState(cmd.content))
+ .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
+ }
+
+ private Effect<State> onChangeBody(DraftState state, ChangeBody cmd) {
+ return Effect()
+ .persist(state.withBody(cmd.newBody))
+ .thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
+ }
+
+ private Effect<State> onChangeBody(PublishedState state, ChangeBody cmd) {
+ return Effect()
+ .persist(state.withBody(cmd.newBody))
+ .thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
+ }
+
+ private Effect<State> onPublish(DraftState state, Publish cmd) {
+ return Effect()
+ .persist(new PublishedState(state.content))
+ .thenRun(
+ () -> {
+ System.out.println("Blog post published: " + state.postId());
+ cmd.replyTo.tell(Done.getInstance());
+ });
+ }
+
+ private Effect<State> onGetPost(DraftState state, GetPost cmd) {
+ cmd.replyTo.tell(state.content);
+ return Effect().none();
+ }
+
+ private Effect<State> onGetPost(PublishedState state, GetPost cmd) {
+ cmd.replyTo.tell(state.content);
+ return Effect().none();
+ }
+
+ // commandHandler, eventHandler as in above snippets
+}
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostCounts.java
b/docs/src/test/java/jdocs/home/state/BlogPostCounts.java
new file mode 100644
index 0000000..df4c1d3
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostCounts.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+// #change-handler
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.persistence.Persistence;
+import org.apache.pekko.persistence.query.DeletedDurableState;
+import org.apache.pekko.persistence.query.DurableStateChange;
+import org.apache.pekko.persistence.query.UpdatedDurableState;
+import org.apache.pekko.persistence.r2dbc.session.javadsl.R2dbcSession;
+import org.apache.pekko.persistence.r2dbc.state.javadsl.ChangeHandler;
+import io.r2dbc.spi.Statement;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * Keep track of number of published blog posts. Count per slice.
+ *
+ * <pre>
+ * CREATE TABLE post_count (slice INT NOT NULL, cnt BIGINT NOT NULL, PRIMARY
KEY(slice));
+ * </pre>
+ */
+public class BlogPostCounts implements ChangeHandler<BlogPost.State> {
+
+ private final ActorSystem<?> system;
+
+ private final String incrementSql =
+ "INSERT INTO post_count (slice, cnt) VALUES ($1, 1) " +
+ "ON CONFLICT (slice) DO UPDATE SET cnt = excluded.cnt + 1";
+
+ private final String decrementSql =
+ "UPDATE post_count SET cnt = cnt - 1 WHERE slice = $1";
+
+ public BlogPostCounts(ActorSystem<?> system) {
+ this.system = system;
+ }
+
+ @Override
+ public CompletionStage<Done> process(R2dbcSession session,
DurableStateChange<BlogPost.State> change) {
+ if (change instanceof UpdatedDurableState updatedDurableState) {
+ return processUpdate(session, updatedDurableState);
+ } else if (change instanceof DeletedDurableState deletedDurableState) {
+ return processDelete(session, deletedDurableState);
+ } else {
+ throw new IllegalArgumentException("Unexpected change " +
change.getClass().getName());
+ }
+ }
+
+ private CompletionStage<Done> processUpdate(R2dbcSession session,
UpdatedDurableState<BlogPost.State> upd) {
+ if (upd.value() instanceof BlogPost.PublishedState) {
+ int slice =
Persistence.get(system).sliceForPersistenceId(upd.persistenceId());
+ Statement stmt = session
+ .createStatement(incrementSql)
+ .bind(0, slice);
+ return session.updateOne(stmt).thenApply(count -> Done.getInstance());
+ } else {
+ return CompletableFuture.completedFuture(Done.getInstance());
+ }
+ }
+
+ private CompletionStage<Done> processDelete(R2dbcSession session,
DeletedDurableState<BlogPost.State> del) {
+ int slice =
Persistence.get(system).sliceForPersistenceId(del.persistenceId());
+ Statement stmt = session
+ .createStatement(decrementSql)
+ .bind(0, slice);
+ return session.updateOne(stmt).thenApply(count -> Done.getInstance());
+ }
+
+}
+// #change-handler
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java
b/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java
new file mode 100644
index 0000000..019f562
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+import org.apache.pekko.persistence.r2dbc.state.javadsl.AdditionalColumn;
+// #additional-column-json
+import io.r2dbc.postgresql.codec.Json;
+
+public class BlogPostJsonColumn extends AdditionalColumn<BlogPost.State, Json>
{
+ @Override
+ public Class<Json> fieldClass() {
+ return Json.class;
+ }
+
+ @Override
+ public String columnName() {
+ return "query_json";
+ }
+
+ @Override
+ public Binding<Json> bind(Upsert<BlogPost.State> upsert) {
+ BlogPost.State state = upsert.value();
+ if (state instanceof BlogPost.DraftState s) {
+ // a json library would be used here
+ String jsonString = "{\"title\": \"" + s.content.title + "\",
\"published\": false}";
+ Json json = Json.of(jsonString);
+ return AdditionalColumn.bindValue(json);
+ } else if (state instanceof BlogPost.PublishedState s) {
+ // a json library would be used here
+ String jsonString = "{\"title\": \"" + s.content.title + "\",
\"published\": true}";
+ Json json = Json.of(jsonString);
+ return AdditionalColumn.bindValue(json);
+ } else {
+ return AdditionalColumn.skip();
+ }
+ }
+}
+// #additional-column-json
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostQuery.java
b/docs/src/test/java/jdocs/home/state/BlogPostQuery.java
new file mode 100644
index 0000000..5b04166
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostQuery.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+// #query
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.persistence.r2dbc.session.javadsl.R2dbcSession;
+import org.apache.pekko.serialization.SerializationExtension;
+import io.r2dbc.spi.Statement;
+
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+public class BlogPostQuery {
+ private final ActorSystem<?> system;
+
+ public BlogPostQuery(ActorSystem<?> system) {
+ this.system = system;
+ }
+
+ private final String findByTitleSql =
+ "SELECT state_ser_id, state_ser_manifest, state_payload " +
+ "FROM durable_state_blog_post " +
+ "WHERE title = $1";
+
+ public CompletionStage<List<BlogPost.State>> findByTitle(String title) {
+ return R2dbcSession.withSession(system, session -> {
+ Statement stmt = session.createStatement(findByTitleSql).bind(0, title);
+ return session.select(stmt, row -> {
+ int serializerId = row.get("state_ser_id", Integer.class);
+ String serializerManifest = row.get("state_ser_manifest",
String.class);
+ byte[] payload = row.get("state_payload", byte[].class);
+ BlogPost.State state = (BlogPost.State)
SerializationExtension.get(system)
+ .deserialize(payload, serializerId, serializerManifest).get();
+ return state;
+ });
+ });
+ }
+
+}
+// #query
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java
b/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java
new file mode 100644
index 0000000..a3f1402
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+// #additional-column
+import org.apache.pekko.persistence.r2dbc.state.javadsl.AdditionalColumn;
+
+public class BlogPostTitleColumn extends AdditionalColumn<BlogPost.State,
String> {
+ @Override
+ public Class<String> fieldClass() {
+ return String.class;
+ }
+
+ @Override
+ public String columnName() {
+ return "title";
+ }
+
+ @Override
+ public Binding<String> bind(Upsert<BlogPost.State> upsert) {
+ BlogPost.State state = upsert.value();
+ if (state.equals(BlogPost.BlankState.INSTANCE)) {
+ return AdditionalColumn.bindNull();
+ } else if (state instanceof BlogPost.DraftState draft) {
+ return AdditionalColumn.bindValue(draft.content.title);
+ } else {
+ return AdditionalColumn.skip();
+ }
+ }
+}
+// #additional-column
diff --git a/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
b/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
index d568e45..cfc725b 100644
--- a/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
+++ b/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
@@ -7,6 +7,10 @@
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
package docs.home.query
import org.apache.pekko
@@ -50,7 +54,7 @@ object QueryDocCompileOnly {
{
// #currentEventsBySlices
- import org.apache.pekko.persistence.query.typed.EventEnvelope
+ import pekko.persistence.query.typed.EventEnvelope
// Slit the slices into 4 ranges
val numberOfSliceRanges: Int = 4
@@ -71,7 +75,7 @@ object QueryDocCompileOnly {
{
// #currentChangesBySlices
- import org.apache.pekko.persistence.query.UpdatedDurableState
+ import pekko.persistence.query.UpdatedDurableState
// Slit the slices into 4 ranges
val numberOfSliceRanges: Int = 4
diff --git a/docs/src/test/scala/docs/home/state/BlogPost.scala
b/docs/src/test/scala/docs/home/state/BlogPost.scala
new file mode 100644
index 0000000..e8475fd
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPost.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.pattern.StatusReply
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.state.scaladsl.DurableStateBehavior
+import pekko.persistence.typed.state.scaladsl.Effect
+
+object BlogPost {
+ val EntityTypeName = "BlogPost"
+
+ sealed trait State
+
+ case object BlankState extends State
+
+ final case class DraftState(content: PostContent) extends State {
+ def withBody(newBody: String): DraftState =
+ copy(content = content.copy(body = newBody))
+
+ def postId: String = content.postId
+ }
+
+ final case class PublishedState(content: PostContent) extends State {
+ def postId: String = content.postId
+ }
+
+ sealed trait Command
+ final case class AddPost(content: PostContent, replyTo:
ActorRef[StatusReply[AddPostDone]]) extends Command
+ final case class AddPostDone(postId: String)
+ final case class GetPost(replyTo: ActorRef[PostContent]) extends Command
+ final case class ChangeBody(newBody: String, replyTo: ActorRef[Done])
extends Command
+ final case class Publish(replyTo: ActorRef[Done]) extends Command
+ final case class PostContent(postId: String, title: String, body: String)
+
+ def apply(entityId: String, persistenceId: PersistenceId): Behavior[Command]
= {
+ Behaviors.setup { context =>
+ context.log.info("Starting BlogPostEntityDurableState {}", entityId)
+ DurableStateBehavior[Command, State](persistenceId, emptyState =
BlankState, commandHandler)
+ }
+ }
+
+ private val commandHandler: (State, Command) => Effect[State] = { (state,
command) =>
+ state match {
+
+ case BlankState =>
+ command match {
+ case cmd: AddPost => addPost(cmd)
+ case _ => Effect.unhandled
+ }
+
+ case draftState: DraftState =>
+ command match {
+ case cmd: ChangeBody => changeBody(draftState, cmd)
+ case Publish(replyTo) => publish(draftState, replyTo)
+ case GetPost(replyTo) => getPost(draftState, replyTo)
+ case AddPost(_, replyTo) =>
+ Effect.unhandled[State].thenRun(_ => replyTo !
StatusReply.Error("Cannot add post while in draft state"))
+ }
+
+ case publishedState: PublishedState =>
+ command match {
+ case GetPost(replyTo) => getPost(publishedState, replyTo)
+ case AddPost(_, replyTo) =>
+ Effect.unhandled[State].thenRun(_ => replyTo !
StatusReply.Error("Cannot add post, already published"))
+ case _ => Effect.unhandled
+ }
+ }
+ }
+
+ private def addPost(cmd: AddPost): Effect[State] = {
+ Effect.persist(DraftState(cmd.content)).thenRun { _ =>
+ cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId))
+ }
+ }
+
+ private def changeBody(state: DraftState, cmd: ChangeBody): Effect[State] = {
+ Effect.persist(state.withBody(cmd.newBody)).thenRun { _ =>
+ cmd.replyTo ! Done
+ }
+ }
+
+ private def publish(state: DraftState, replyTo: ActorRef[Done]):
Effect[State] = {
+ Effect.persist(PublishedState(state.content)).thenRun { _ =>
+ println(s"Blog post ${state.postId} was published")
+ replyTo ! Done
+ }
+ }
+
+ private def getPost(state: DraftState, replyTo: ActorRef[PostContent]):
Effect[State] = {
+ replyTo ! state.content
+ Effect.none
+ }
+
+ private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]):
Effect[State] = {
+ replyTo ! state.content
+ Effect.none
+ }
+
+}
diff --git a/docs/src/test/scala/docs/home/state/BlogPostCounts.scala
b/docs/src/test/scala/docs/home/state/BlogPostCounts.scala
new file mode 100644
index 0000000..fb5a810
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostCounts.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+// #change-handler
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.persistence.r2dbc.state.scaladsl.ChangeHandler
+
+// #change-handler
+
+/* config:
+// #change-handler-config
+pekko.persistence.r2dbc.state {
+ change-handler {
+ "BlogPost" = "docs.BlogPostCounts"
+ }
+}
+// #change-handler-config
+ */
+
+// #change-handler
+/**
+ * Keep track of number of published blog posts. Count per slice.
+ *
+ * {{{
+ * CREATE TABLE post_count (slice INT NOT NULL, cnt BIGINT NOT NULL, PRIMARY
KEY(slice));
+ * }}}
+ */
+class BlogPostCounts(system: ActorSystem[_]) extends
ChangeHandler[BlogPost.State] {
+
+ private val incrementSql =
+ "INSERT INTO post_count (slice, cnt) VALUES ($1, 1) " +
+ "ON CONFLICT (slice) DO UPDATE SET cnt = excluded.cnt + 1"
+
+ private val decrementSql =
+ "UPDATE post_count SET cnt = cnt - 1 WHERE slice = $1"
+
+ private implicit val ec: ExecutionContext = system.executionContext
+
+ override def process(session: R2dbcSession, change:
DurableStateChange[BlogPost.State]): Future[Done] = {
+ change match {
+ case upd: UpdatedDurableState[BlogPost.State] =>
+ upd.value match {
+ case _: BlogPost.PublishedState =>
+ val slice =
Persistence(system).sliceForPersistenceId(upd.persistenceId)
+ val stmt = session
+ .createStatement(incrementSql)
+ .bind(0, slice)
+ session.updateOne(stmt).map(_ => Done)
+ case _ =>
+ Future.successful(Done)
+ }
+
+ case del: DeletedDurableState[BlogPost.State] =>
+ val slice =
Persistence(system).sliceForPersistenceId(del.persistenceId)
+ val stmt = session
+ .createStatement(decrementSql)
+ .bind(0, slice)
+ session.updateOne(stmt).map(_ => Done)
+ }
+ }
+}
+// #change-handler
diff --git a/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala
b/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala
new file mode 100644
index 0000000..7a3b075
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+import org.apache.pekko
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+// #additional-column-json
+import io.r2dbc.postgresql.codec.Json
+
+class BlogPostJsonColumn extends AdditionalColumn[BlogPost.State, Json] {
+
+ override val columnName: String = "query_json"
+
+ override def bind(upsert: AdditionalColumn.Upsert[BlogPost.State]):
AdditionalColumn.Binding[Json] =
+ upsert.value match {
+ case s: BlogPost.DraftState =>
+ // a json library would be used here
+ val jsonString = s"""{"title": "${s.content.title}", "published":
false}"""
+ val json = Json.of(jsonString)
+ AdditionalColumn.BindValue(json)
+ case s: BlogPost.PublishedState =>
+ // a json library would be used here
+ val jsonString = s"""{"title": "${s.content.title}", "published":
true}"""
+ val json = Json.of(jsonString)
+ AdditionalColumn.BindValue(json)
+ case _ =>
+ AdditionalColumn.Skip
+ }
+}
+// #additional-column-json
diff --git a/docs/src/test/scala/docs/home/state/BlogPostQuery.scala
b/docs/src/test/scala/docs/home/state/BlogPostQuery.scala
new file mode 100644
index 0000000..fefe055
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostQuery.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+// #query
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.serialization.SerializationExtension
+
+class BlogPostQuery(system: ActorSystem[_]) {
+
+ private val findByTitleSql =
+ "SELECT state_ser_id, state_ser_manifest, state_payload " +
+ "FROM durable_state_blog_post " +
+ "WHERE title = $1"
+
+ def findByTitle(title: String): Future[IndexedSeq[BlogPost.State]] = {
+ R2dbcSession.withSession(system) { session =>
+ session.select(session.createStatement(findByTitleSql).bind(0, title)) {
row =>
+ val serializerId = row.get("state_ser_id", classOf[java.lang.Integer])
+ val serializerManifest = row.get("state_ser_manifest", classOf[String])
+ val payload = row.get("state_payload", classOf[Array[Byte]])
+ val state = SerializationExtension(system)
+ .deserialize(payload, serializerId, serializerManifest)
+ .get
+ .asInstanceOf[BlogPost.State]
+ state
+ }
+ }
+ }
+
+}
+// #query
diff --git a/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala
b/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala
new file mode 100644
index 0000000..334882f
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+// #additional-column
+import org.apache.pekko
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+
+// #additional-column
+
+/* config:
+// #additional-column-config
+pekko.persistence.r2dbc.state {
+ additional-columns {
+ "BlogPost" = ["docs.BlogPostTitleColumn"]
+ }
+ custom-table {
+ "BlogPost" = durable_state_blog_post
+ }
+}
+// #additional-column-config
+ */
+
+// #additional-column
+class BlogPostTitleColumn extends AdditionalColumn[BlogPost.State, String] {
+
+ override val columnName: String = "title"
+
+ override def bind(upsert: AdditionalColumn.Upsert[BlogPost.State]):
AdditionalColumn.Binding[String] =
+ upsert.value match {
+ case BlogPost.BlankState =>
+ AdditionalColumn.BindNull
+ case s: BlogPost.DraftState =>
+ AdditionalColumn.BindValue(s.content.title)
+ case _: BlogPost.PublishedState =>
+ AdditionalColumn.Skip
+ }
+}
+// #additional-column
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ecf61ae..c6b42ed 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -84,7 +84,8 @@ object Dependencies {
val docs = Seq(
TestDeps.pekkoPersistenceTyped,
- TestDeps.pekkoShardingTyped)
+ TestDeps.pekkoShardingTyped,
+ r2dbcPostgres % "provided,test")
val pekkoTestDependencyOverrides = Seq(
TestDeps.pekkoActor,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]