This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e6625a5  bring docs up to date with akka-persistence-r2dbc 1.1.0 (#387)
e6625a5 is described below

commit e6625a540257aa8b4e68eb2237b0dba7ee7db7ce
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 23 12:08:50 2026 +0100

    bring docs up to date with akka-persistence-r2dbc 1.1.0 (#387)
    
    * docs: add missing files and sections from akka-persistence-r2dbc 1.1.x
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/cd0c2f8f-6f30-4593-8bd9-3b92c0f0a983
    
    Co-authored-by: pjfanning <[email protected]>
    
    * refactor
    
    * scalafmt
    
    * Update Dependencies.scala
    
    * refactor
    
    * Update BlogPostJsonColumn.java
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 docs/src/main/paradox/config.md                    |  52 +++++
 docs/src/main/paradox/durable-state-store.md       | 138 +++++++++++++-
 docs/src/main/paradox/getting-started.md           |   3 +
 docs/src/main/paradox/index.md                     |   1 +
 docs/src/main/paradox/journal.md                   |  16 ++
 docs/src/main/paradox/postgres_json.md             |  28 +++
 docs/src/main/paradox/snapshots.md                 |  11 ++
 .../java/jdocs/home/MultiPluginDocExample.java     |  68 +++++++
 docs/src/test/java/jdocs/home/state/BlogPost.java  | 211 +++++++++++++++++++++
 .../test/java/jdocs/home/state/BlogPostCounts.java |  84 ++++++++
 .../java/jdocs/home/state/BlogPostJsonColumn.java  |  49 +++++
 .../test/java/jdocs/home/state/BlogPostQuery.java  |  52 +++++
 .../java/jdocs/home/state/BlogPostTitleColumn.java |  42 ++++
 .../docs/home/query/QueryDocCompileOnly.scala      |   8 +-
 docs/src/test/scala/docs/home/state/BlogPost.scala | 116 +++++++++++
 .../scala/docs/home/state/BlogPostCounts.scala     |  84 ++++++++
 .../scala/docs/home/state/BlogPostJsonColumn.scala |  41 ++++
 .../test/scala/docs/home/state/BlogPostQuery.scala |  47 +++++
 .../docs/home/state/BlogPostTitleColumn.scala      |  50 +++++
 project/Dependencies.scala                         |   3 +-
 20 files changed, 1099 insertions(+), 5 deletions(-)

diff --git a/docs/src/main/paradox/config.md b/docs/src/main/paradox/config.md
index b6da7ae..0a51eb4 100644
--- a/docs/src/main/paradox/config.md
+++ b/docs/src/main/paradox/config.md
@@ -20,6 +20,58 @@ The following configuration can be overridden in your 
`application.conf`:
 
 @@snip [reference.conf](/core/src/main/resources/reference.conf) 
{#connection-settings}
 
+## Journal configuration
+
+Journal configuration properties are by default defined under 
`pekko.persistence.r2dbc.journal`.
+
+See @ref:[Journal plugin configuration](journal.md#configuration).
+
+## Snapshot configuration
+
+Snapshot store configuration properties are by default defined under 
`pekko.persistence.r2dbc.snapshot`.
+
+See @ref:[Snapshot store plugin configuration](snapshots.md#configuration).
+
+## Durable state configuration
+
+Durable state store configuration properties are by default defined under 
`pekko.persistence.r2dbc.state`.
+
+See @ref:[Durable state plugin 
configuration](durable-state-store.md#configuration).
+
+## Query configuration
+
+Query configuration properties are by default defined under 
`pekko.persistence.r2dbc.query`.
+
+See @ref:[Query plugin configuration](query.md#configuration).
+
+## Multiple plugins
+
+To enable the plugins to be used by default, add the following lines to your 
Pekko `application.conf`:
+
+@@snip 
[application.conf](/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala)
 {#default-config}
+
+Note that all plugins have a shared root config section 
`pekko.persistence.r2dbc`, which also contains the
+@ref:[Connection configuration](#connection-configuration) for the connection 
pool that is shared for the plugins.
+
+You can use additional plugins with different configuration. For example if 
more than one database is used. Then you would define the configuration
+such as:
+
+@@snip 
[application.conf](/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala)
 {#second-config}
+
+To use the additional plugin you would @scala[define]@java[override] the 
plugin id.
+
+Scala
+:  @@snip 
[MultiPluginDocExample.scala](/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/MultiPluginSpec.scala){#withPlugins}
+
+Java
+:  @@snip 
[MultiPluginDocExample.java](/docs/src/test/java/jdocs/home/MultiPluginDocExample.java)
 {#withPlugins}
+
+It is similar for `DurableStateBehavior`, @scala[define 
`withDurableStateStorePluginId("second-r2dbc.state")`]
+@java[override `durableStateStorePluginId` with `"second-r2dbc.state"`].
+
+For queries and Projection `SourceProvider` you would use 
`"second-r2dbc.query"` instead of the default 
@scala[`R2dbcReadJournal.Identifier`]
+@java[`R2dbcReadJournal.Identifier()`] (`"pekko.persistence.r2dbc.query"`).
+
 ## Plugin configuration at runtime
 
 Plugin implementation supports plugin configuration at runtime.
diff --git a/docs/src/main/paradox/durable-state-store.md 
b/docs/src/main/paradox/durable-state-store.md
index 397413f..0316a24 100644
--- a/docs/src/main/paradox/durable-state-store.md
+++ b/docs/src/main/paradox/durable-state-store.md
@@ -29,5 +29,139 @@ The following can be overridden in your `application.conf` 
for the journal speci
 
 ## Deletes
 
-The journal supports deletes through hard deletes, which means the durable 
state store entries are actually deleted from the database.
-There is no materialized view with a copy of the state so make sure to not 
delete durable states too early if they are used from projections or queries.
+The store supports deletes through hard deletes, which means the durable state 
store entries are actually deleted from
+the database. There is no materialized view with a copy of the state so make 
sure to not delete durable states too early
+if they are used from projections or queries.
+
+For each persistent id one tombstone record is kept in the store when the 
state of the persistence id has been deleted.
+The reason for the tombstone record is to keep track of the latest revision 
number so that subsequent state changes
+don't reuse the same revision numbers that have been deleted.
+
+See the @ref[DurableStateCleanup tool](cleanup.md#durable-state-cleanup-tool) 
for more information about how to delete
+state tombstone records.
+
+## State serialization
+
+The state is serialized with @extref:[Pekko 
Serialization](pekko:serialization.html) and the binary representation
+is stored in the `state_payload` column together with information about what 
serializer that was used in the
+`state_ser_id` and `state_ser_manifest` columns.
+
+For PostgreSQL the payload is stored as `BYTEA` type. Alternatively, you can 
use `JSONB` column type as described in
+@ref:[PostgreSQL JSON](postgres_json.md).
+
+## Storing query representation
+
+@extref:[Durable state actors](pekko:typed/durable-state/persistence.html) can 
only be looked up by their entity id.
+Additional indexed data can be stored as a query representation. You can 
either store the query representation from
+an asynchronous @ref:[Projection](projection.md) or you can store it in the 
same transaction as the Durable State
+upsert or delete.
+
+Advantages of storing the query representation in the same transaction as the 
Durable State change:
+
+* exactly-once processing and atomic update with the Durable State change
+* no eventual consistency delay from asynchronous Projection processing
+* no need for Projection offset storage
+
+That said, for write heavy Durable State, a Projection can have the advantage 
of not impacting write latency of
+the Durable State updates. Note that updating the secondary index also has an 
impact on the write latency.
+
+### Additional columns
+
+In many cases you just want a secondary index on one or a few fields other 
than the entity id. For that purpose
+you can configure one or more @apidoc[AdditionalColumn] classes for an entity 
type. The `AdditionalColumn` will
+extract the field from the Durable State value and define how to bind it to a 
database column.
+
+The configuration:
+
+@@snip 
[application.conf](/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala)
 { #additional-column-config }
+
+For each entity type you can define a list of fully qualified class names of 
`AdditionalColumn` implementations.
+The `AdditionalColumn` implementation may optionally define an ActorSystem 
constructor parameter.
+
+`AdditionalColumn` for a secondary index on the title of @extref:[blog posts 
(example in the Pekko 
documentation)](pekko:typed/durable-state/persistence.html#changing-behavior):
+
+Scala
+:  @@snip 
[BlogPostTitleColumn.scala](/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala)
 { #additional-column }
+
+Java
+:  @@snip 
[BlogPostTitleColumn.java](/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java)
 { #additional-column }
+
+From the `bind` method you can return one of:
+
+* @scala[`AdditionalColumn.BindValue`]@java[`AdditionalColumn.bindValue`] - 
bind a value such as a `String` or `Long` to the database column
+* @scala[`AdditionalColumn.BindNull`]@java[`AdditionalColumn.bindNull`] - 
store `null` in the database column
+* @scala[`AdditionalColumn.Skip`]@java[`AdditionalColumn.skip`] - don't update 
the database column for this change, keep existing value
+
+You would have to add the additional columns to the `durable_state` table 
definition, and create secondary database index.
+Unless you only have one entity type it's best to define a separate table with 
the `custom-table` configuration, see
+example above. The full serialized state and the additional columns are stored 
in the custom table instead of the
+default `durable_state` table. The custom table should have the same table 
definition as the default 
+`durable_state` table but with the extra columns added.
+
+The state can be found by the additional column and deserialized like this:
+
+Scala
+:  @@snip 
[BlogPostQuery.scala](/docs/src/test/scala/docs/home/state/BlogPostQuery.scala) 
{ #query }
+
+Java
+:  @@snip 
[BlogPostQuery.java](/docs/src/test/java/jdocs/home/state/BlogPostQuery.java) { 
#query }
+
+#### Additional column as PostgreSQL JSON
+
+With PostgreSQL the additional column type can be `JSONB` to take advantage of 
PostgreSQL support for [JSON 
Types](https://www.postgresql.org/docs/current/datatype-json.html).
+
+Then you would wrap the string or byte array representation of the JSON in 
`io.r2dbc.postgresql.codec.Json` when
+binding the value.
+
+Scala
+:  @@snip 
[BlogPostJsonColumn.scala](/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala)
 { #additional-column-json }
+
+Java
+:  @@snip 
[BlogPostJsonColumn.java](/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java)
 { #additional-column-json }
+
+### Change handler
+
+For more advanced cases where the query representation would not fit in 
@ref:[additional columns](#additional-columns)
+you can configure a @apidoc[ChangeHandler] for an entity type. The 
`ChangeHandler` will be invoked for each
+Durable State change. From the `ChangeHandler` you can run database operations 
in the same transaction
+as the Durable State upsert or delete.
+
+The configuration:
+
+@@snip 
[application.conf](/docs/src/test/scala/docs/home/state/BlogPostCounts.scala) { 
#change-handler-config }
+
+For each entity type you can define the fully qualified class name of a 
`ChangeHandler` implementation.
+The `ChangeHandler` implementation may optionally define an ActorSystem 
constructor parameter.
+
+`ChangeHandler` for keeping track of number of published @extref:[blog posts 
(example in the Pekko 
documentation)](pekko:typed/durable-state/persistence.html#changing-behavior):
+
+Scala
+:  @@snip 
[BlogPostCounts.scala](/docs/src/test/scala/docs/home/state/BlogPostCounts.scala)
 { #change-handler }
+
+Java
+:  @@snip 
[BlogPostCounts.java](/docs/src/test/java/jdocs/home/state/BlogPostCounts.java) 
{ #change-handler }
+
+The @apidoc[DurableStateChange] parameter is an @apidoc[UpdatedDurableState] 
when the Durable State is created or updated.
+It is a @apidoc[DeletedDurableState] when the Durable State is deleted.
+
+The @apidoc[org.apache.pekko.persistence.r2dbc.session.*.R2dbcSession] 
provides the means to access an open R2DBC connection
+that can be used to process the change. The target database operations run in 
the same transaction as the storage
+of the Durable State change.
+
+One change is processed at a time. It will not be invoked with the next change 
until after the `process` method returns
+and the returned @scala[`Future`]@java[`CompletionStage`] is completed.
+
+@@@ note { title="Concurrency semantics" }
+
+The `ChangeHandler` should be implemented as a stateless function without 
mutable state because the same
+`ChangeHandler` instance may be invoked concurrently for different entities.
+For a specific entity (persistenceId) one change is processed at a time and 
the `process` method will not be
+invoked with the next change for that entity until after the returned 
@scala[`Future`]@java[`CompletionStage`] is completed.
+
+@@@
+
+### PostgreSQL JSON payload
+
+For PostgreSQL, an alternative to defining additional columns or change 
handlers can be to store the state as JSON
+as described in @ref:[PostgreSQL JSON](postgres_json.md). Then you can add 
[secondary jsonb 
indexes](https://www.postgresql.org/docs/current/datatype-json.html#JSON-INDEXING)
+on the payload content for queries.
diff --git a/docs/src/main/paradox/getting-started.md 
b/docs/src/main/paradox/getting-started.md
index 21bc7a8..bdafe80 100644
--- a/docs/src/main/paradox/getting-started.md
+++ b/docs/src/main/paradox/getting-started.md
@@ -61,6 +61,9 @@ Tables and indexes:
 Postgres:
 : @@snip [create_tables.sql](/ddl-scripts/create_tables_postgres.sql)
 
+Postgres JSONB:
+: @@snip [create_tables.sql](/ddl-scripts/create_tables_postgres_jsonb.sql)
+
 Yugabyte:
 : @@snip [create_tables.sql](/ddl-scripts/create_tables_yugabyte.sql)
 
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 2d94f93..629d0c2 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -14,6 +14,7 @@ The Pekko Persistence R2DBC plugin allows for using SQL 
database with R2DBC as a
 * [Snapshot Plugin](snapshots.md)
 * [Durable State Plugin](durable-state-store.md)
 * [Query Plugin](query.md)
+* [PostgreSQL JSON](postgres_json.md)
 * [Projection](projection.md)
 * [Cleanup Tool](cleanup.md)
 * [Migration Tool](migration.md)
diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md
index 1b76ea8..c224a07 100644
--- a/docs/src/main/paradox/journal.md
+++ b/docs/src/main/paradox/journal.md
@@ -35,3 +35,19 @@ The following can be overridden in your `application.conf` 
for the journal speci
 
 The journal supports deletes through hard deletes, which means the journal 
entries are actually deleted from the database. 
 There is no materialized view with a copy of the event so make sure to not 
delete events too early if they are used from projections or queries.
+
+For each persistent id one tombstone record is kept in the event journal when 
all events of a persistence id have been
+deleted. The reason for the tombstone record is to keep track of the latest 
sequence number so that subsequent events
+don't reuse the same sequence numbers that have been deleted.
+
+See the @ref[EventSourcedCleanup tool](cleanup.md#event-sourced-cleanup-tool) 
for more information about how to delete
+events, snapshots and tombstone records.
+
+## Event serialization
+
+The events are serialized with @extref:[Pekko 
Serialization](pekko:serialization.html) and the binary representation
+is stored in the `event_payload` column together with information about what 
serializer that was used in the
+`event_ser_id` and `event_ser_manifest` columns.
+
+For PostgreSQL the payload is stored as `BYTEA` type. Alternatively, you can 
use `JSONB` column type as described in
+@ref:[PostgreSQL JSON](postgres_json.md).
diff --git a/docs/src/main/paradox/postgres_json.md 
b/docs/src/main/paradox/postgres_json.md
new file mode 100644
index 0000000..f4d7060
--- /dev/null
+++ b/docs/src/main/paradox/postgres_json.md
@@ -0,0 +1,28 @@
+# PostgreSQL JSON
+
+By default, the serialized event, snapshot and durable state payloads, are 
stored in `BYTEA` columns. Alternatively,
+you can use `JSONB` column type to take advantage of PostgreSQL support for 
[JSON Types](https://www.postgresql.org/docs/current/datatype-json.html).
+For example, then you can add secondary jsonb indexes on the payload content 
for queries.
+
+To enable `JSONB` payloads you need the following.
+
+1. Create the schema as shown in the Postgres JSONB tab in @ref:[Creating the 
schema](getting-started.md#schema).
+
+1. Define configuration:
+    ```
+    pekko.persistence.r2dbc {
+      journal.payload-column-type = JSONB
+      snapshot.payload-column-type = JSONB
+      state.payload-column-type = JSONB
+    }
+    ```
+
+1. Serialize the event, snapshot and durable state payloads as JSON bytes.
+
+For the serialization you can use:
+* @extref:[Pekko Serialization with Jackson](pekko:serialization-jackson.html) 
with JSON format.
+  * Make sure to disable 
@extref:[compression](pekko:serialization-jackson.html#compression) with 
`pekko.serialization.jackson.jackson-json.compression.algorithm = off`
+* Plain strings in JSON format.
+* A custom Pekko serializer that uses a binary format as UTF-8 encoded JSON 
string.
+
+Note that you can enable this feature selectively for the event journal, 
snapshot, and durable state.
diff --git a/docs/src/main/paradox/snapshots.md 
b/docs/src/main/paradox/snapshots.md
index 4e1f1ab..b565197 100644
--- a/docs/src/main/paradox/snapshots.md
+++ b/docs/src/main/paradox/snapshots.md
@@ -37,3 +37,14 @@ If a `keepNSnapshots > 1` is specified for an 
`EventSourcedBehavior` that settin
 
 The reason for this is that there is no real benefit to keep multiple 
snapshots around on a relational 
 database with a high consistency.
+
+See also @ref[EventSourcedCleanup tool](cleanup.md#event-sourced-cleanup-tool).
+
+## Snapshot serialization
+
+The state is serialized with @extref:[Pekko 
Serialization](pekko:serialization.html) and the binary snapshot representation
+is stored in the `snapshot` column together with information about what 
serializer that was used in the
+`ser_id` and `ser_manifest` columns.
+
+For PostgreSQL the payload is stored as `BYTEA` type. Alternatively, you can 
use `JSONB` column type as described in
+@ref:[PostgreSQL JSON](postgres_json.md).
diff --git a/docs/src/test/java/jdocs/home/MultiPluginDocExample.java 
b/docs/src/test/java/jdocs/home/MultiPluginDocExample.java
new file mode 100644
index 0000000..6210cf0
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/MultiPluginDocExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home;
+
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.apache.pekko.persistence.typed.javadsl.CommandHandler;
+import org.apache.pekko.persistence.typed.javadsl.EventHandler;
+import org.apache.pekko.persistence.typed.javadsl.EventSourcedBehavior;
+
+public class MultiPluginDocExample {
+
+  static
+  // #withPlugins
+  public class MyEntity extends EventSourcedBehavior<MyEntity.Command, 
MyEntity.Event, MyEntity.State> {
+    // #withPlugins
+    public MyEntity(PersistenceId persistenceId) {
+      super(persistenceId);
+    }
+
+    interface Command {
+    }
+
+    interface Event {
+    }
+
+    static class State {
+    }
+
+    @Override
+    public State emptyState() {
+      return new State();
+    }
+
+    @Override
+    public CommandHandler<Command, Event, State> commandHandler() {
+      return newCommandHandlerBuilder().build();
+    }
+
+    @Override
+    public EventHandler<State, Event> eventHandler() {
+      return newEventHandlerBuilder().build();
+    }
+
+    // #withPlugins
+    @Override
+    public String journalPluginId() {
+      return "second-r2dbc.journal";
+    }
+
+    @Override
+    public String snapshotPluginId() {
+      return "second-r2dbc.snapshot";
+    }
+  }
+  // #withPlugins
+
+}
diff --git a/docs/src/test/java/jdocs/home/state/BlogPost.java 
b/docs/src/test/java/jdocs/home/state/BlogPost.java
new file mode 100644
index 0000000..f896340
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPost.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorRef;
+import org.apache.pekko.actor.typed.Behavior;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.persistence.typed.PersistenceId;
+import org.apache.pekko.persistence.typed.state.javadsl.CommandHandler;
+import org.apache.pekko.persistence.typed.state.javadsl.CommandHandlerBuilder;
+import org.apache.pekko.persistence.typed.state.javadsl.DurableStateBehavior;
+import org.apache.pekko.persistence.typed.state.javadsl.Effect;
+
+public class BlogPost
+    extends DurableStateBehavior<
+        BlogPost.Command, BlogPost.State> {
+  // commands and state as in above snippets
+
+  interface State {}
+
+  enum BlankState implements State {
+    INSTANCE
+  }
+
+  static class DraftState implements State {
+    final PostContent content;
+
+    DraftState(PostContent content) {
+      this.content = content;
+    }
+
+    DraftState withContent(PostContent newContent) {
+      return new DraftState(newContent);
+    }
+
+    DraftState withBody(String newBody) {
+      return withContent(new PostContent(postId(), content.title, newBody));
+    }
+
+    String postId() {
+      return content.postId;
+    }
+  }
+
+  static class PublishedState implements State {
+    final PostContent content;
+
+    PublishedState(PostContent content) {
+      this.content = content;
+    }
+
+    PublishedState withContent(PostContent newContent) {
+      return new PublishedState(newContent);
+    }
+
+    PublishedState withBody(String newBody) {
+      return withContent(new PostContent(postId(), content.title, newBody));
+    }
+
+    String postId() {
+      return content.postId;
+    }
+  }
+
+  public interface Command {}
+  public static class AddPost implements Command {
+    final PostContent content;
+    final ActorRef<AddPostDone> replyTo;
+
+    public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
+      this.content = content;
+      this.replyTo = replyTo;
+    }
+  }
+
+  public static class AddPostDone implements Command {
+    final String postId;
+
+    public AddPostDone(String postId) {
+      this.postId = postId;
+    }
+  }
+  public static class GetPost implements Command {
+    final ActorRef<PostContent> replyTo;
+
+    public GetPost(ActorRef<PostContent> replyTo) {
+      this.replyTo = replyTo;
+    }
+  }
+
+  public static class ChangeBody implements Command {
+    final String newBody;
+    final ActorRef<Done> replyTo;
+
+    public ChangeBody(String newBody, ActorRef<Done> replyTo) {
+      this.newBody = newBody;
+      this.replyTo = replyTo;
+    }
+  }
+
+  public static class Publish implements Command {
+    final ActorRef<Done> replyTo;
+
+    public Publish(ActorRef<Done> replyTo) {
+      this.replyTo = replyTo;
+    }
+  }
+
+  public static class PostContent implements Command {
+    final String postId;
+    final String title;
+    final String body;
+
+    public PostContent(String postId, String title, String body) {
+      this.postId = postId;
+      this.title = title;
+      this.body = body;
+    }
+  }
+
+  public static Behavior<Command> create(String entityId, PersistenceId 
persistenceId) {
+    return Behaviors.setup(
+        context -> {
+          context.getLog().info("Starting BlogPostEntityDurableState {}", 
entityId);
+          return new BlogPost(persistenceId);
+        });
+  }
+
+  private BlogPost(PersistenceId persistenceId) {
+    super(persistenceId);
+  }
+
+  @Override
+  public State emptyState() {
+    return BlankState.INSTANCE;
+  }
+
+  @Override
+  public CommandHandler<Command, State> commandHandler() {
+    CommandHandlerBuilder<Command, State> builder = newCommandHandlerBuilder();
+
+    builder.forStateType(BlankState.class).onCommand(AddPost.class, 
this::onAddPost);
+
+    builder
+        .forStateType(DraftState.class)
+        .onCommand(ChangeBody.class, this::onChangeBody)
+        .onCommand(Publish.class, this::onPublish)
+        .onCommand(GetPost.class, this::onGetPost);
+
+    builder
+        .forStateType(PublishedState.class)
+        .onCommand(ChangeBody.class, this::onChangeBody)
+        .onCommand(GetPost.class, this::onGetPost);
+
+    builder.forAnyState().onCommand(AddPost.class, (state, cmd) -> 
Effect().unhandled());
+
+    return builder.build();
+  }
+
+  private Effect<State> onAddPost(AddPost cmd) {
+    return Effect()
+        .persist(new DraftState(cmd.content))
+        .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
+  }
+
+  private Effect<State> onChangeBody(DraftState state, ChangeBody cmd) {
+    return Effect()
+        .persist(state.withBody(cmd.newBody))
+        .thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
+  }
+
+  private Effect<State> onChangeBody(PublishedState state, ChangeBody cmd) {
+    return Effect()
+        .persist(state.withBody(cmd.newBody))
+        .thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
+  }
+
+  private Effect<State> onPublish(DraftState state, Publish cmd) {
+    return Effect()
+        .persist(new PublishedState(state.content))
+        .thenRun(
+            () -> {
+              System.out.println("Blog post published: " + state.postId());
+              cmd.replyTo.tell(Done.getInstance());
+            });
+  }
+
+  private Effect<State> onGetPost(DraftState state, GetPost cmd) {
+    cmd.replyTo.tell(state.content);
+    return Effect().none();
+  }
+
+  private Effect<State> onGetPost(PublishedState state, GetPost cmd) {
+    cmd.replyTo.tell(state.content);
+    return Effect().none();
+  }
+
+  // commandHandler, eventHandler as in above snippets
+}
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostCounts.java 
b/docs/src/test/java/jdocs/home/state/BlogPostCounts.java
new file mode 100644
index 0000000..df4c1d3
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostCounts.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+// #change-handler
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.persistence.Persistence;
+import org.apache.pekko.persistence.query.DeletedDurableState;
+import org.apache.pekko.persistence.query.DurableStateChange;
+import org.apache.pekko.persistence.query.UpdatedDurableState;
+import org.apache.pekko.persistence.r2dbc.session.javadsl.R2dbcSession;
+import org.apache.pekko.persistence.r2dbc.state.javadsl.ChangeHandler;
+import io.r2dbc.spi.Statement;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * Keep track of number of published blog posts. Count per slice.
+ *
+ * <pre>
+ * CREATE TABLE post_count (slice INT NOT NULL, cnt BIGINT NOT NULL, PRIMARY 
KEY(slice));
+ * </pre>
+ */
+public class BlogPostCounts implements ChangeHandler<BlogPost.State> {
+
+  private final ActorSystem<?> system;
+
+  private final String incrementSql =
+      "INSERT INTO post_count (slice, cnt) VALUES ($1, 1) " +
+          "ON CONFLICT (slice) DO UPDATE SET cnt = excluded.cnt + 1";
+
+  private final String decrementSql =
+      "UPDATE post_count SET cnt = cnt - 1 WHERE slice = $1";
+
+  public BlogPostCounts(ActorSystem<?> system) {
+    this.system = system;
+  }
+
+  @Override
+  public CompletionStage<Done> process(R2dbcSession session, 
DurableStateChange<BlogPost.State> change) {
+    if (change instanceof UpdatedDurableState updatedDurableState) {
+      return processUpdate(session, updatedDurableState);
+    } else if (change instanceof DeletedDurableState deletedDurableState) {
+      return processDelete(session, deletedDurableState);
+    } else {
+      throw new IllegalArgumentException("Unexpected change " + 
change.getClass().getName());
+    }
+  }
+
+  private CompletionStage<Done> processUpdate(R2dbcSession session, 
UpdatedDurableState<BlogPost.State> upd) {
+    if (upd.value() instanceof BlogPost.PublishedState) {
+      int slice = 
Persistence.get(system).sliceForPersistenceId(upd.persistenceId());
+      Statement stmt = session
+          .createStatement(incrementSql)
+          .bind(0, slice);
+      return session.updateOne(stmt).thenApply(count -> Done.getInstance());
+    } else {
+      return CompletableFuture.completedFuture(Done.getInstance());
+    }
+  }
+
+  private CompletionStage<Done> processDelete(R2dbcSession session, 
DeletedDurableState<BlogPost.State> del) {
+    int slice = 
Persistence.get(system).sliceForPersistenceId(del.persistenceId());
+    Statement stmt = session
+        .createStatement(decrementSql)
+        .bind(0, slice);
+    return session.updateOne(stmt).thenApply(count -> Done.getInstance());
+  }
+
+}
+// #change-handler
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java 
b/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java
new file mode 100644
index 0000000..019f562
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostJsonColumn.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+import org.apache.pekko.persistence.r2dbc.state.javadsl.AdditionalColumn;
+// #additional-column-json
+import io.r2dbc.postgresql.codec.Json;
+
+public class BlogPostJsonColumn extends AdditionalColumn<BlogPost.State, Json> 
{
+  @Override
+  public Class<Json> fieldClass() {
+    return Json.class;
+  }
+
+  @Override
+  public String columnName() {
+    return "query_json";
+  }
+
+  @Override
+  public Binding<Json> bind(Upsert<BlogPost.State> upsert) {
+    BlogPost.State state = upsert.value();
+    if (state instanceof BlogPost.DraftState s) {
+      // a json library would be used here
+      String jsonString = "{\"title\": \"" + s.content.title + "\", 
\"published\": false}";
+      Json json = Json.of(jsonString);
+      return AdditionalColumn.bindValue(json);
+    } else if (state instanceof BlogPost.PublishedState s) {
+      // a json library would be used here
+      String jsonString = "{\"title\": \"" + s.content.title + "\", 
\"published\": true}";
+      Json json = Json.of(jsonString);
+      return AdditionalColumn.bindValue(json);
+    } else {
+      return AdditionalColumn.skip();
+    }
+  }
+}
+// #additional-column-json
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostQuery.java 
b/docs/src/test/java/jdocs/home/state/BlogPostQuery.java
new file mode 100644
index 0000000..5b04166
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostQuery.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+// #query
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.persistence.r2dbc.session.javadsl.R2dbcSession;
+import org.apache.pekko.serialization.SerializationExtension;
+import io.r2dbc.spi.Statement;
+
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+public class BlogPostQuery {
+  private final ActorSystem<?> system;
+
+  public BlogPostQuery(ActorSystem<?> system) {
+    this.system = system;
+  }
+
+  private final String findByTitleSql =
+      "SELECT state_ser_id, state_ser_manifest, state_payload " +
+      "FROM durable_state_blog_post " +
+      "WHERE title = $1";
+
+  public CompletionStage<List<BlogPost.State>> findByTitle(String title) {
+    return R2dbcSession.withSession(system, session -> {
+      Statement stmt = session.createStatement(findByTitleSql).bind(0, title);
+      return session.select(stmt, row -> {
+        int serializerId = row.get("state_ser_id", Integer.class);
+        String serializerManifest = row.get("state_ser_manifest", 
String.class);
+        byte[] payload = row.get("state_payload", byte[].class);
+        BlogPost.State state = (BlogPost.State) 
SerializationExtension.get(system)
+            .deserialize(payload, serializerId, serializerManifest).get();
+        return state;
+      });
+    });
+  }
+
+}
+// #query
diff --git a/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java 
b/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java
new file mode 100644
index 0000000..a3f1402
--- /dev/null
+++ b/docs/src/test/java/jdocs/home/state/BlogPostTitleColumn.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/**
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.state;
+
+// #additional-column
+import org.apache.pekko.persistence.r2dbc.state.javadsl.AdditionalColumn;
+
+public class BlogPostTitleColumn extends AdditionalColumn<BlogPost.State, 
String> {
+  @Override
+  public Class<String> fieldClass() {
+    return String.class;
+  }
+
+  @Override
+  public String columnName() {
+    return "title";
+  }
+
+  @Override
+  public Binding<String> bind(Upsert<BlogPost.State> upsert) {
+    BlogPost.State state = upsert.value();
+    if (state.equals(BlogPost.BlankState.INSTANCE)) {
+      return AdditionalColumn.bindNull();
+    } else if (state instanceof BlogPost.DraftState draft) {
+      return AdditionalColumn.bindValue(draft.content.title);
+    } else {
+      return AdditionalColumn.skip();
+    }
+  }
+}
+// #additional-column
diff --git a/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala 
b/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
index d568e45..cfc725b 100644
--- a/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
+++ b/docs/src/test/scala/docs/home/query/QueryDocCompileOnly.scala
@@ -7,6 +7,10 @@
  * This file is part of the Apache Pekko project, which was derived from Akka.
  */
 
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
 package docs.home.query
 
 import org.apache.pekko
@@ -50,7 +54,7 @@ object QueryDocCompileOnly {
 
   {
     // #currentEventsBySlices
-    import org.apache.pekko.persistence.query.typed.EventEnvelope
+    import pekko.persistence.query.typed.EventEnvelope
 
     // Slit the slices into 4 ranges
     val numberOfSliceRanges: Int = 4
@@ -71,7 +75,7 @@ object QueryDocCompileOnly {
 
   {
     // #currentChangesBySlices
-    import org.apache.pekko.persistence.query.UpdatedDurableState
+    import pekko.persistence.query.UpdatedDurableState
 
     // Slit the slices into 4 ranges
     val numberOfSliceRanges: Int = 4
diff --git a/docs/src/test/scala/docs/home/state/BlogPost.scala 
b/docs/src/test/scala/docs/home/state/BlogPost.scala
new file mode 100644
index 0000000..e8475fd
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPost.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.pattern.StatusReply
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.state.scaladsl.DurableStateBehavior
+import pekko.persistence.typed.state.scaladsl.Effect
+
+object BlogPost {
+  val EntityTypeName = "BlogPost"
+
+  sealed trait State
+
+  case object BlankState extends State
+
+  final case class DraftState(content: PostContent) extends State {
+    def withBody(newBody: String): DraftState =
+      copy(content = content.copy(body = newBody))
+
+    def postId: String = content.postId
+  }
+
+  final case class PublishedState(content: PostContent) extends State {
+    def postId: String = content.postId
+  }
+
+  sealed trait Command
+  final case class AddPost(content: PostContent, replyTo: 
ActorRef[StatusReply[AddPostDone]]) extends Command
+  final case class AddPostDone(postId: String)
+  final case class GetPost(replyTo: ActorRef[PostContent]) extends Command
+  final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) 
extends Command
+  final case class Publish(replyTo: ActorRef[Done]) extends Command
+  final case class PostContent(postId: String, title: String, body: String)
+
+  def apply(entityId: String, persistenceId: PersistenceId): Behavior[Command] 
= {
+    Behaviors.setup { context =>
+      context.log.info("Starting BlogPostEntityDurableState {}", entityId)
+      DurableStateBehavior[Command, State](persistenceId, emptyState = 
BlankState, commandHandler)
+    }
+  }
+
+  private val commandHandler: (State, Command) => Effect[State] = { (state, 
command) =>
+    state match {
+
+      case BlankState =>
+        command match {
+          case cmd: AddPost => addPost(cmd)
+          case _            => Effect.unhandled
+        }
+
+      case draftState: DraftState =>
+        command match {
+          case cmd: ChangeBody     => changeBody(draftState, cmd)
+          case Publish(replyTo)    => publish(draftState, replyTo)
+          case GetPost(replyTo)    => getPost(draftState, replyTo)
+          case AddPost(_, replyTo) =>
+            Effect.unhandled[State].thenRun(_ => replyTo ! 
StatusReply.Error("Cannot add post while in draft state"))
+        }
+
+      case publishedState: PublishedState =>
+        command match {
+          case GetPost(replyTo)    => getPost(publishedState, replyTo)
+          case AddPost(_, replyTo) =>
+            Effect.unhandled[State].thenRun(_ => replyTo ! 
StatusReply.Error("Cannot add post, already published"))
+          case _ => Effect.unhandled
+        }
+    }
+  }
+
+  private def addPost(cmd: AddPost): Effect[State] = {
+    Effect.persist(DraftState(cmd.content)).thenRun { _ =>
+      cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId))
+    }
+  }
+
+  private def changeBody(state: DraftState, cmd: ChangeBody): Effect[State] = {
+    Effect.persist(state.withBody(cmd.newBody)).thenRun { _ =>
+      cmd.replyTo ! Done
+    }
+  }
+
+  private def publish(state: DraftState, replyTo: ActorRef[Done]): 
Effect[State] = {
+    Effect.persist(PublishedState(state.content)).thenRun { _ =>
+      println(s"Blog post ${state.postId} was published")
+      replyTo ! Done
+    }
+  }
+
+  private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): 
Effect[State] = {
+    replyTo ! state.content
+    Effect.none
+  }
+
+  private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): 
Effect[State] = {
+    replyTo ! state.content
+    Effect.none
+  }
+
+}
diff --git a/docs/src/test/scala/docs/home/state/BlogPostCounts.scala 
b/docs/src/test/scala/docs/home/state/BlogPostCounts.scala
new file mode 100644
index 0000000..fb5a810
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostCounts.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+// #change-handler
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.Persistence
+import pekko.persistence.query.DeletedDurableState
+import pekko.persistence.query.DurableStateChange
+import pekko.persistence.query.UpdatedDurableState
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.persistence.r2dbc.state.scaladsl.ChangeHandler
+
+// #change-handler
+
+/* config:
+// #change-handler-config
+pekko.persistence.r2dbc.state {
+  change-handler {
+    "BlogPost" = "docs.BlogPostCounts"
+  }
+}
+// #change-handler-config
+ */
+
+// #change-handler
+/**
+ * Keep track of number of published blog posts. Count per slice.
+ *
+ * {{{
+ * CREATE TABLE post_count (slice INT NOT NULL, cnt BIGINT NOT NULL, PRIMARY 
KEY(slice));
+ * }}}
+ */
+class BlogPostCounts(system: ActorSystem[_]) extends 
ChangeHandler[BlogPost.State] {
+
+  private val incrementSql =
+    "INSERT INTO post_count (slice, cnt) VALUES ($1, 1) " +
+    "ON CONFLICT (slice) DO UPDATE SET cnt = excluded.cnt + 1"
+
+  private val decrementSql =
+    "UPDATE post_count SET cnt = cnt - 1 WHERE slice = $1"
+
+  private implicit val ec: ExecutionContext = system.executionContext
+
+  override def process(session: R2dbcSession, change: 
DurableStateChange[BlogPost.State]): Future[Done] = {
+    change match {
+      case upd: UpdatedDurableState[BlogPost.State] =>
+        upd.value match {
+          case _: BlogPost.PublishedState =>
+            val slice = 
Persistence(system).sliceForPersistenceId(upd.persistenceId)
+            val stmt = session
+              .createStatement(incrementSql)
+              .bind(0, slice)
+            session.updateOne(stmt).map(_ => Done)
+          case _ =>
+            Future.successful(Done)
+        }
+
+      case del: DeletedDurableState[BlogPost.State] =>
+        val slice = 
Persistence(system).sliceForPersistenceId(del.persistenceId)
+        val stmt = session
+          .createStatement(decrementSql)
+          .bind(0, slice)
+        session.updateOne(stmt).map(_ => Done)
+    }
+  }
+}
+// #change-handler
diff --git a/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala 
b/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala
new file mode 100644
index 0000000..7a3b075
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostJsonColumn.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+import org.apache.pekko
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+// #additional-column-json
+import io.r2dbc.postgresql.codec.Json
+
+class BlogPostJsonColumn extends AdditionalColumn[BlogPost.State, Json] {
+
+  override val columnName: String = "query_json"
+
+  override def bind(upsert: AdditionalColumn.Upsert[BlogPost.State]): 
AdditionalColumn.Binding[Json] =
+    upsert.value match {
+      case s: BlogPost.DraftState =>
+        // a json library would be used here
+        val jsonString = s"""{"title": "${s.content.title}", "published": 
false}"""
+        val json = Json.of(jsonString)
+        AdditionalColumn.BindValue(json)
+      case s: BlogPost.PublishedState =>
+        // a json library would be used here
+        val jsonString = s"""{"title": "${s.content.title}", "published": 
true}"""
+        val json = Json.of(jsonString)
+        AdditionalColumn.BindValue(json)
+      case _ =>
+        AdditionalColumn.Skip
+    }
+}
+// #additional-column-json
diff --git a/docs/src/test/scala/docs/home/state/BlogPostQuery.scala 
b/docs/src/test/scala/docs/home/state/BlogPostQuery.scala
new file mode 100644
index 0000000..fefe055
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostQuery.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+// #query
+import scala.concurrent.Future
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.session.scaladsl.R2dbcSession
+import pekko.serialization.SerializationExtension
+
+class BlogPostQuery(system: ActorSystem[_]) {
+
+  private val findByTitleSql =
+    "SELECT state_ser_id, state_ser_manifest, state_payload " +
+    "FROM durable_state_blog_post " +
+    "WHERE title = $1"
+
+  def findByTitle(title: String): Future[IndexedSeq[BlogPost.State]] = {
+    R2dbcSession.withSession(system) { session =>
+      session.select(session.createStatement(findByTitleSql).bind(0, title)) { 
row =>
+        val serializerId = row.get("state_ser_id", classOf[java.lang.Integer])
+        val serializerManifest = row.get("state_ser_manifest", classOf[String])
+        val payload = row.get("state_payload", classOf[Array[Byte]])
+        val state = SerializationExtension(system)
+          .deserialize(payload, serializerId, serializerManifest)
+          .get
+          .asInstanceOf[BlogPost.State]
+        state
+      }
+    }
+  }
+
+}
+// #query
diff --git a/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala 
b/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala
new file mode 100644
index 0000000..334882f
--- /dev/null
+++ b/docs/src/test/scala/docs/home/state/BlogPostTitleColumn.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.state
+
+// #additional-column
+import org.apache.pekko
+import pekko.persistence.r2dbc.state.scaladsl.AdditionalColumn
+
+// #additional-column
+
+/* config:
+// #additional-column-config
+pekko.persistence.r2dbc.state {
+  additional-columns {
+    "BlogPost" = ["docs.BlogPostTitleColumn"]
+  }
+  custom-table {
+    "BlogPost" =  durable_state_blog_post
+  }
+}
+// #additional-column-config
+ */
+
+// #additional-column
+class BlogPostTitleColumn extends AdditionalColumn[BlogPost.State, String] {
+
+  override val columnName: String = "title"
+
+  override def bind(upsert: AdditionalColumn.Upsert[BlogPost.State]): 
AdditionalColumn.Binding[String] =
+    upsert.value match {
+      case BlogPost.BlankState =>
+        AdditionalColumn.BindNull
+      case s: BlogPost.DraftState =>
+        AdditionalColumn.BindValue(s.content.title)
+      case _: BlogPost.PublishedState =>
+        AdditionalColumn.Skip
+    }
+}
+// #additional-column
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ecf61ae..c6b42ed 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -84,7 +84,8 @@ object Dependencies {
 
   val docs = Seq(
     TestDeps.pekkoPersistenceTyped,
-    TestDeps.pekkoShardingTyped)
+    TestDeps.pekkoShardingTyped,
+    r2dbcPostgres % "provided,test")
 
   val pekkoTestDependencyOverrides = Seq(
     TestDeps.pekkoActor,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to