This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 9f5a4ec add r2dbc docs (#489)
9f5a4ec is described below
commit 9f5a4ec0edeefd89a0944630a7dc649d20a31357
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 23 11:34:08 2026 +0100
add r2dbc docs (#489)
* r2dbc docs
* Update build.sbt
* Update R2dbcProjectionDocExample.java
* Update R2dbcProjectionDocExample.scala
* Update R2dbcProjectionDocExample.scala
* Update R2dbcProjectionDocExample.scala
---
.github/workflows/integration-tests-r2dbc.yml | 6 +-
build.sbt | 7 +
docs/src/main/paradox/index.md | 3 +-
docs/src/main/paradox/r2dbc.md | 275 ++++++++++++++++++++
project/Dependencies.scala | 200 +++++++--------
.../ddl-scripts}/create_tables_postgres.sql | 0
.../ddl-scripts/create_tables_yugabyte.sql | 17 +-
.../home/projection/R2dbcProjectionDocExample.java | 283 +++++++++++++++++++++
.../test/scala/docs/home/CborSerializable.scala | 16 ++
.../projection/R2dbcProjectionDocExample.scala | 274 ++++++++++++++++++++
10 files changed, 967 insertions(+), 114 deletions(-)
diff --git a/.github/workflows/integration-tests-r2dbc.yml
b/.github/workflows/integration-tests-r2dbc.yml
index 3249015..5d98546 100644
--- a/.github/workflows/integration-tests-r2dbc.yml
+++ b/.github/workflows/integration-tests-r2dbc.yml
@@ -58,11 +58,11 @@ jobs:
docker compose -f docker/docker-compose-postgres.yml up -d
# TODO: could we poll the port instead of sleep?
sleep 10
- docker exec -i docker-postgres-db-1 psql -U postgres -t <
ddl-scripts/create_tables_postgres.sql
+ docker exec -i docker-postgres-db-1 psql -U postgres -t <
r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE
DATABASE database1;'
- docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1
< ddl-scripts/create_tables_postgres.sql
+ docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1
< r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE
DATABASE database2;'
- docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2
< ddl-scripts/create_tables_postgres.sql
+ docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2
< r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
- name: Run all integration tests with default Scala and Java ${{
matrix.java-version }}
run: sbt "r2dbc-int-test/test" ${{ matrix.extraOpts }}
diff --git a/build.sbt b/build.sbt
index f5f18cf..9d5e938 100644
--- a/build.sbt
+++ b/build.sbt
@@ -203,6 +203,7 @@ lazy val grpcTest =
Test / fork := true,
Test / javaOptions +=
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
.dependsOn(grpc % "compile;test->compile")
+ .dependsOn(r2dbc % Test)
.dependsOn(testkit % Test)
lazy val grpcIntTest =
@@ -297,6 +298,12 @@ lazy val docs = project
"scaladoc.pekko.base_url" ->
s"https://pekko.apache.org/api/pekko/${Dependencies.PekkoVersionInDocs}/",
"javadoc.pekko.base_url" ->
s"https://pekko.apache.org/japi/pekko/${Dependencies.PekkoVersionInDocs}/",
"javadoc.pekko.link_style" -> "direct",
+ "extref.pekko-persistence-r2dbc.base_url" ->
+
s"https://pekko.apache.org/docs/pekko-persistence-r2dbc/${Dependencies.PekkoPersistenceR2dbcVersionInDocs}/%s",
+ "scaladoc.org.apache.pekko.persistence.r2dbc.base_url" ->
+
s"https://pekko.apache.org/api/pekko-persistence-r2dbc/${Dependencies.PekkoPersistenceR2dbcVersionInDocs}",
+ "javadoc.org.apache.pekko.persistence.r2dbc.base_url" ->
+
s"https://pekko.apache.org/japi/pekko-persistence-r2dbc/${Dependencies.PekkoPersistenceR2dbcVersionInDocs}",
// Java
"javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
// Scala
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index c94425e..6c07a76 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -10,7 +10,7 @@
* [Event Sourced](eventsourced.md)
* [Durable State](durable-state.md)
* [Kafka](kafka.md)
-* [gRPC](grpc.md)
+* [R2DBC](r2dbc.md)
* [Cassandra](cassandra.md)
* [JDBC](jdbc.md)
* [Slick](slick.md)
@@ -19,6 +19,7 @@
* [Flow](flow.md)
* [Error](error.md)
* [Projection Settings](projection-settings.md)
+* [gRPC](grpc.md)
* [Management](management.md)
* [Testing](testing.md)
* [Classic](classic.md)
diff --git a/docs/src/main/paradox/r2dbc.md b/docs/src/main/paradox/r2dbc.md
new file mode 100644
index 0000000..6cc2f85
--- /dev/null
+++ b/docs/src/main/paradox/r2dbc.md
@@ -0,0 +1,275 @@
+# Offset in a relational DB with R2DBC
+
+The @apidoc[R2dbcProjection$] has support for storing the offset in a
relational database using R2DBC via @extref:[Apache Pekko Persistence
R2DBC](pekko-persistence-r2dbc:overview.html).
+
+The source of the envelopes is from a `SourceProvider`, which can be:
+
+* events from Event Sourced entities via the @ref:[SourceProvider for
eventsBySlices](eventsourced.md#sourceprovider-for-eventsbyslices) with the
@extref:[eventsBySlices
query](pekko-persistence-r2dbc:query.html#eventsbyslices)
+* state changes for Durable State entities via the @ref:[SourceProvider for
changesBySlices](durable-state.md#sourceprovider-for-changesbyslices) with the
@extref:[changesBySlices
query](pekko-persistence-r2dbc:query.html#changesbyslices)
+* any other `SourceProvider` with supported @ref:[offset types](#offset-types)
+
+A @apidoc[R2dbcHandler] receives a @apidoc[pekko.projection.*.R2dbcSession]
instance and an envelope. The
+`R2dbcSession` provides the means to access an open R2DBC connection that can
be used to process the envelope.
+The target database operations can be run in the same transaction as the
storage of the offset, which means
+that @ref:[exactly-once](#exactly-once) processing semantics is supported. It
also offers
+@ref:[at-least-once](#at-least-once) semantics.
+
+## Dependencies
+
+To use the R2DBC module of Pekko Projections add the following dependency in
your project:
+
+@@dependency [Maven,sbt,Gradle] {
+group=org.apache.pekko
+artifact=pekko-projection-r2dbc_$scala.binary.version$
+version=$project.version$
+group2=org.apache.pekko
+artifact2=pekko-persistence-r2dbc_$scala.binary.version$
+version2=$pekko.r2dbc.version$
+}
+
+Pekko Projections R2DBC depends on Pekko $pekko.version$ or later, and note
that it is important that all `pekko-*`
+dependencies are in the same version, so it is recommended to depend on them
explicitly to avoid problems
+with transient dependencies causing an unlucky mix of versions.
+
+@@project-info{ projectId="r2dbc" }
+
+
+### Transitive dependencies
+
+The table below shows `pekko-projection-r2dbc`'s direct dependencies, and the
second tab shows all libraries it depends on transitively.
+
+@@dependencies{ projectId="r2dbc" }
+
+## Schema
+
+The `projection_offset_store`, `projection_timestamp_offset_store` and
`projection_management` tables
+need to be created in the configured database:
+
+PostgreSQL
+: @@snip [PostgreSQL
Schema](/r2dbc-int-test/ddl-scripts/create_tables_postgres.sql)
+
+YugaByte
+: @@snip [YugaByte
Schema](/r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql)
+
+## Configuration
+
+By default, `pekko-projection-r2dbc` uses the same connection pool and
`dialect` as `pekko-persistence-r2dbc`, see
+@extref:[Connection
configuration](pekko-persistence-r2dbc:config.html#connection-configuration).
+
+### Reference configuration
+
+The following can be overridden in your `application.conf` for the Projection
specific settings:
+
+@@snip [reference.conf](/r2dbc/src/main/resources/reference.conf)
{#projection-config}
+
+## Running with Sharded Daemon Process
+
+The Sharded Daemon Process can be used to distribute `n` instances of a given
Projection across the cluster.
+Therefore, it's important that each Projection instance consumes a subset of
the stream of envelopes.
+
+When using `eventsBySlices` the initialization code looks like this:
+
+Scala
+: @@snip
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
{ #initProjections }
+
+Java
+: @@snip
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
{ #initProjections }
+
+The @ref:[`ShoppingCartHandler` is shown below](#handler).
+
+It is possible to dynamically scale the number of Projection instances as
described in @extref:[Sharded Daemon Process
documentation](pekko:typed/cluster-sharded-daemon-process.html#dynamic-scaling-of-number-of-workers).
+
+There are alternative ways of running the `ProjectionBehavior` as described in
@ref:[Running a Projection](running.md), but note that when using the R2DBC
plugin as `SourceProvider` it is recommended to use `eventsBySlices` and not
`eventsByTag`.
+
+## Slices
+
+The `SourceProvider` for Event Sourced actors has historically been using
`eventsByTag` but the R2DBC plugin is
+instead providing `eventsBySlices` as an improved solution.
+
+The usage of `eventsByTag` for Projections has the drawback that the number of
tags must be decided
+up-front and can't easily be changed afterwards. Starting with too many tags
means much overhead since
+many projection instances would be running on each node in a small Pekko
Cluster. Each projection instance
+polling the database periodically. Starting with too few tags means that it
can't be scaled later to more
+Pekko nodes.
+
+With `eventsBySlices` more Projection instances can be added when needed and
still reuse the offsets
+for the previous slice distributions.
+
+A slice is deterministically defined based on the persistence id. The purpose
is to evenly distribute all
+persistence ids over the slices. The `eventsBySlices` query is for a range of
the slices. For example if
+using 1024 slices and running 4 Projection instances the slice ranges would be
0-255, 256-511, 512-767, 768-1023.
+Changing to 8 slice ranges means that the ranges would be 0-127, 128-255,
256-383, ..., 768-895, 896-1023.
+
+However, when changing the number of slices the projections with the old slice
distribution must be
+stopped before starting new projections. That can be done with a full shutdown
before deploying the
+new slice distribution or pause (stop) the projections with @ref:[the
management API](management.md).
+
+When using `R2dbcProjection` together with the
`EventSourcedProvider.eventsBySlices` the events will be delivered in
+sequence number order without duplicates.
+
+When using `R2dbcProjection` together with
`DurableStateSourceProvider.changesBySlices` the changes will be delivered
+in revision number order without duplicates.
+
+## exactly-once
+
+The offset is stored in the same transaction used for the user defined
`handler`, which means exactly-once
+processing semantics if the projection is restarted from previously stored
offset.
+
+Scala
+: @@snip
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
{ #exactlyOnce }
+
+Java
+: @@snip
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
{ #exactlyOnce }
+
+The @ref:[`ShoppingCartHandler` is shown below](#handler).
+
+## at-least-once
+
+The offset is stored after the envelope has been processed and giving
at-least-once processing semantics.
+This means that if the projection is restarted from a previously stored offset
some elements may be processed more
+than once. Therefore, the @ref:[Handler](#handler) code must be idempotent.
+
+Scala
+: @@snip
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
{ #atLeastOnce }
+
+Java
+: @@snip
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
{ #atLeastOnce }
+
+The offset is stored after a time window, or limited by a number of envelopes,
whatever happens first.
+This window can be defined with `withSaveOffset` of the returned
`AtLeastOnceProjection`.
+The default settings for the window is defined in configuration section
`pekko.projection.at-least-once`.
+There is a performance benefit of not storing the offset too often, but the
drawback is that there can be more
+duplicates when the projection that will be processed again when the
projection is restarted.
+
+The @ref:[`ShoppingCartHandler` is shown below](#handler).
+
+## groupedWithin
+
+The envelopes can be grouped before processing, which can be useful for batch
updates.
+
+Scala
+: @@snip
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
{ #grouped }
+
+Java
+: @@snip
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
{ #grouped }
+
+The envelopes are grouped within a time window, or limited by a number of
envelopes, whatever happens first.
+This window can be defined with `withGroup` of the returned
`GroupedProjection`. The default settings for
+the window is defined in configuration section `pekko.projection.grouped`.
+
+When using `groupedWithin` the handler is a
@scala[`R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]`]@java[`R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>>`].
+The @ref:[`GroupedShoppingCartHandler` is shown below](#grouped-handler).
+
+The offset is stored in the same transaction used for the user defined
`handler`, which means exactly-once
+processing semantics if the projection is restarted from previously stored
offset.
+
+## Handler
+
+It's in the @apidoc[R2dbcHandler] that you implement the processing of each
envelope. It's essentially a consumer function
+from `(R2dbcSession, Envelope)` to
@scala[`Future[Done]`]@java[`CompletionStage<Done>`].
+
+A handler that is consuming `ShoppingCart.Event` from `eventsBySlices` can
look like this:
+
+Scala
+: @@snip
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
{ #handler }
+
+Java
+: @@snip
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
{ #handler }
+
+@@@ note { title=Hint }
+Such simple handlers can also be defined as plain functions via the helper
@scala[`R2dbcHandler.apply`]@java[`R2dbcHandler.fromFunction`] factory method.
+@@@
+
+### Grouped handler
+
+When using @ref:[`R2dbcProjection.groupedWithin`](#groupedwithin) the handler
is processing a @scala[`Seq`]@java[`List`] of envelopes.
+
+Scala
+: @@snip
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
{ #grouped-handler }
+
+Java
+: @@snip
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
{ #grouped-handler }
+
+### Stateful handler
+
+The @apidoc[R2dbcHandler] can be stateful, with variables and mutable data
structures. It is invoked by the `Projection` machinery
+one envelope at a time and visibility guarantees between the invocations are
handled automatically, i.e. no volatile
+or other concurrency primitives are needed for managing the state as long as
it's not accessed by other threads
+than the one that called `process`.
+
+@@@ note
+
+It is important that the `Handler` instance is not shared between several
`Projection` instances,
+because then it would be invoked concurrently, which is not how it is intended
to be used. Each `Projection`
+instance should use a new `Handler` instance.
+
+@@@
+
+### Async handler
+
+The @apidoc[Handler] can be used with `R2dbcProjection.atLeastOnceAsync` and
+`R2dbcProjection.groupedWithinAsync` if the handler is not storing the
projection result in the database.
+The handler could send to a Kafka topic or integrate with something else.
+
+There are several examples of such `Handler` in the @ref:[documentation for
Cassandra Projections](cassandra.md#handler).
+Same type of handlers can be used with `R2dbcProjection` instead of
`CassandraProjection`.
+
+### Actor handler
+
+A good alternative for advanced state management is to implement the handler
as an
+@extref:[actor](pekko:typed/typed/actors.html) which is described in
+@ref:[Processing with Actor](actor.md).
+
+### Flow handler
+
+A Pekko Streams `FlowWithContext` can be used instead of a handler for
processing the envelopes,
+which is described in @ref:[Processing with Pekko Streams](flow.md).
+
+### Handler lifecycle
+
+You can override the `start` and `stop` methods of the `R2dbcHandler` to
implement initialization
+before first envelope is processed and resource cleanup when the projection is
stopped.
+Those methods are also called when the `Projection` is restarted after failure.
+
+See also @ref:[error handling](error.md).
+
+## Offset types
+
+The supported offset types of the `R2dbcProjection` are:
+
+* @apidoc[pekko.persistence.query.TimestampOffset] that is used for
@ref:[SourceProvider for
eventsBySlices](eventsourced.md#sourceprovider-for-eventsbyslices) and
@ref:[SourceProvider for
changesBySlices](durable-state.md#sourceprovider-for-changesbyslices)
+* other @apidoc[pekko.persistence.query.Offset] types
+* @apidoc[MergeableOffset] that is used for @ref:[messages from
Kafka](kafka.md#mergeable-offset)
+* `String`
+* `Int`
+* `Long`
+* Any other type that has a configured Pekko Serializer is stored with base64
encoding of the serialized bytes.
+
+## Publish events for lower latency
+
+See @extref:[eventsBySlices
documentation](pekko-persistence-r2dbc:query.html#publish-events-for-lower-latency-of-eventsbyslices).
+
+
+## Multiple plugins
+
+Just like how multiple plugins can be configured as described for @extref[the
R2DBC persistence plugin](pekko-persistence-r2dbc:config.html#multiple-plugins)
multiple projection configurations are possible.
+
+For Projection offset store you need another config section:
+
+@@snip
[conf](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#second-projection-config}
+
+Note that the `use-connection-factory` property references the same connection
settings as is used for the `second-r2dbc` plugins, but it could also
+have been a separate connection pool configured as:
+
+@@snip
[conf](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#second-projection-config-with-connection-factory}
+
+In that way you can use the default plugins for the write side and Projection
`SourceProvider`, but use a separate database for the Projection
+handlers and offset storage.
+
+You start the Projections with the `ProjectionSettings` loaded from
`"second-projection-r2dbc"`.
+
+Scala
+: @@snip
[Example.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#projectionSettings}
+
+Java
+: @@snip
[Example.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#projectionSettings}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 970e302..cfa1d16 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -19,6 +19,8 @@ object Dependencies {
val ScalaVersions = Seq(Scala213, Scala3)
val PekkoVersionInDocs = PekkoCoreDependency.default.link
+ // change PekkoPersistenceR2dbcVersionInDocs when 2.0.0-M1 is released
+ val PekkoPersistenceR2dbcVersionInDocs = "current"
val ConnectorsVersionInDocs = PekkoConnectorsDependency.default.link
val ConnectorsKafkaVersionInDocs =
PekkoConnectorsKafkaDependency.default.link
@@ -47,6 +49,7 @@ object Dependencies {
val pekkoPersistenceQuery = "org.apache.pekko" %%
"pekko-persistence-query" % Versions.pekko
val pekkoPersistenceTyped = "org.apache.pekko" %%
"pekko-persistence-typed" % Versions.pekko
val pekkoGrpcRuntime = "org.apache.pekko" %% "pekko-grpc-runtime" %
Versions.pekkoGrpc
+ val pekkoPersistenceR2dbc = "org.apache.pekko" %%
"pekko-persistence-r2dbc" % Versions.pekkoPersistenceR2dbc
// TestKit in compile scope for ProjectionTestKit
val pekkoTypedTestkit = "org.apache.pekko" %% "pekko-actor-testkit-typed"
% Versions.pekko
@@ -61,65 +64,56 @@ object Dependencies {
// must be provided on classpath when using Apache Kafka 2.6.0+
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" %
Versions.jackson
- // not really used in lib code, but in example and test
- val h2Driver = "com.h2database" % "h2" % Versions.h2Driver
-
val r2dbcSpi = "io.r2dbc" % "r2dbc-spi" % "1.0.0.RELEASE"
val r2dbcPool = "io.r2dbc" % "r2dbc-pool" % "1.0.2.RELEASE"
val r2dbcPostgres = "org.postgresql" % "r2dbc-postgresql" % "1.1.1.RELEASE"
val r2dbcMysql = "io.asyncer" % "r2dbc-mysql" % "1.4.2"
}
- object TestNonIt {
- val persistenceTestkit = "org.apache.pekko" %% "pekko-persistence-testkit"
% Versions.pekko % "test"
-
- val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest %
"test"
-
- val logback = "ch.qos.logback" % "logback-classic" % Versions.logback %
"test"
- }
-
object Test {
+ val pekkoClusterShardingTyped = Compile.pekkoClusterShardingTyped % "test"
val pekkoDiscovery = "org.apache.pekko" %% "pekko-discovery" %
Versions.pekko % "test"
val pekkoDistributedData = "org.apache.pekko" %% "pekko-distributed-data"
% Versions.pekko % "test"
val pekkoSerializationJackson = "org.apache.pekko" %%
"pekko-serialization-jackson" % Versions.pekko % "test"
- val pekkoTypedTestkit = Compile.pekkoTypedTestkit
- val pekkoStreamTestkit = Compile.pekkoStreamTestkit
+ val pekkoTypedTestkit = Compile.pekkoTypedTestkit % "test"
+ val pekkoStreamTestkit = Compile.pekkoStreamTestkit % "test"
val persistenceTestkit = "org.apache.pekko" %% "pekko-persistence-testkit"
% Versions.pekko % "test"
- val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest
+ val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest %
"test"
val scalatestJUnit = "org.scalatestplus" %% "junit-4-13" %
(Versions.scalaTest + ".0")
- val junit = "junit" % "junit" % Versions.junit
+ val junit = "junit" % "junit" % Versions.junit % "test"
- val h2Driver = Compile.h2Driver
- val postgresDriver = "org.postgresql" % "postgresql" % "42.7.11"
- val mysqlDriver = "com.mysql" % "mysql-connector-j" % "9.7.0"
- val msSQLServerDriver = "com.microsoft.sqlserver" % "mssql-jdbc" %
"13.4.0.jre11"
- val oracleDriver = "com.oracle.ojdbc" % "ojdbc8" % "19.3.0.0"
+ val h2Driver = "com.h2database" % "h2" % Versions.h2Driver % "test"
+ val postgresDriver = "org.postgresql" % "postgresql" % "42.7.11" % "test"
+ val mysqlDriver = "com.mysql" % "mysql-connector-j" % "9.7.0" % "test"
+ val msSQLServerDriver = "com.microsoft.sqlserver" % "mssql-jdbc" %
"13.4.0.jre11" % "test"
+ val oracleDriver = "com.oracle.ojdbc" % "ojdbc8" % "19.3.0.0" % "test"
- val logback = "ch.qos.logback" % "logback-classic" % Versions.logback
+ val logback = "ch.qos.logback" % "logback-classic" % Versions.logback %
"test"
val cassandraContainer =
- "org.testcontainers" % "testcontainers-cassandra" %
Versions.testContainers
+ "org.testcontainers" % "testcontainers-cassandra" %
Versions.testContainers % "test"
val postgresContainer =
- "org.testcontainers" % "testcontainers-postgresql" %
Versions.testContainers
+ "org.testcontainers" % "testcontainers-postgresql" %
Versions.testContainers % "test"
val mysqlContainer =
- "org.testcontainers" % "testcontainers-mysql" % Versions.testContainers
+ "org.testcontainers" % "testcontainers-mysql" % Versions.testContainers
% "test"
val msSQLServerContainer =
- "org.testcontainers" % "testcontainers-mssqlserver" %
Versions.testContainers
+ "org.testcontainers" % "testcontainers-mssqlserver" %
Versions.testContainers % "test"
val oracleDbContainer =
- "org.testcontainers" % "testcontainers-oracle-xe" %
Versions.testContainers
+ "org.testcontainers" % "testcontainers-oracle-xe" %
Versions.testContainers % "test"
val connectorsKafkaTestkit =
"org.apache.pekko" %% "pekko-connectors-kafka-testkit" %
Versions.connectorsKafka
val r2dbcPostgres = Compile.r2dbcPostgres % "test"
+ val r2dbcMysql = Compile.r2dbcMysql % "test"
}
object Examples {
val hibernate = "org.hibernate" % "hibernate-core" % "7.3.5.Final"
- val pekkoClusterShardingTyped = "org.apache.pekko" %%
"pekko-cluster-sharding-typed" % Versions.pekko
+ val pekkoClusterShardingTyped = Compile.pekkoClusterShardingTyped
val pekkoPersistenceCassandra =
"org.apache.pekko" %% "pekko-persistence-cassandra" %
Versions.pekkoPersistenceCassandra
val pekkoPersistenceJdbc = "org.apache.pekko" %% "pekko-persistence-jdbc"
% Versions.pekkoPersistenceJdbc
@@ -136,30 +130,30 @@ object Dependencies {
// pekko-persistence-query is only needed for OffsetSerialization and to
provide a typed EventEnvelope that
// references the Offset type from pekko-persistence.
Compile.pekkoPersistenceQuery,
- Test.pekkoTypedTestkit % "test",
- Test.logback % "test",
- Test.scalatest % "test")
+ Test.pekkoTypedTestkit,
+ Test.logback,
+ Test.scalatest)
val coreTest =
deps ++= Seq(
- Test.pekkoTypedTestkit % "test",
- Test.pekkoStreamTestkit % "test",
- Test.scalatest % "test",
- Test.scalatestJUnit % "test",
- Test.junit % "test",
- Test.logback % "test")
+ Test.pekkoTypedTestkit,
+ Test.pekkoStreamTestkit,
+ Test.scalatest,
+ Test.scalatestJUnit,
+ Test.junit,
+ Test.logback)
val testKit =
deps ++= Seq(
Compile.pekkoTypedTestkit,
Compile.pekkoStreamTestkit,
- Test.scalatest % "test",
- Test.scalatestJUnit % "test",
- Test.junit % "test",
- Test.logback % "test")
+ Test.scalatest,
+ Test.scalatestJUnit,
+ Test.junit,
+ Test.logback)
val eventsourced =
- deps ++= Seq(Compile.pekkoPersistenceQuery, TestNonIt.persistenceTestkit,
TestNonIt.scalatest, TestNonIt.logback)
+ deps ++= Seq(Compile.pekkoPersistenceQuery, Test.persistenceTestkit,
Test.scalatest, Test.logback)
val state =
deps ++= Seq(Compile.pekkoPersistenceQuery, Test.persistenceTestkit,
Test.pekkoStreamTestkit, Test.scalatest)
@@ -167,50 +161,50 @@ object Dependencies {
val jdbc =
deps ++= Seq(
Compile.pekkoPersistenceQuery,
- Test.pekkoTypedTestkit % "test",
- Test.h2Driver % "test",
- Test.postgresDriver % "test",
- Test.postgresContainer % "test",
- Test.mysqlDriver % "test",
- Test.mysqlContainer % "test",
- Test.msSQLServerDriver % "test",
- Test.msSQLServerContainer % "test",
- Test.oracleDriver % "test",
- Test.oracleDbContainer % "test",
- Test.logback % "test")
+ Test.pekkoTypedTestkit,
+ Test.h2Driver,
+ Test.postgresDriver,
+ Test.postgresContainer,
+ Test.mysqlDriver,
+ Test.mysqlContainer,
+ Test.msSQLServerDriver,
+ Test.msSQLServerContainer,
+ Test.oracleDriver,
+ Test.oracleDbContainer,
+ Test.logback)
val slick =
deps ++= Seq(
Compile.slick,
Compile.pekkoPersistenceQuery,
- Test.pekkoTypedTestkit % "test",
- Test.h2Driver % "test",
- Test.postgresDriver % "test",
- Test.postgresContainer % "test",
- Test.mysqlDriver % "test",
- Test.mysqlContainer % "test",
- Test.msSQLServerDriver % "test",
- Test.msSQLServerContainer % "test",
- Test.oracleDriver % "test",
- Test.oracleDbContainer % "test",
- Test.logback % "test")
+ Test.pekkoTypedTestkit,
+ Test.h2Driver,
+ Test.postgresDriver,
+ Test.postgresContainer,
+ Test.mysqlDriver,
+ Test.mysqlContainer,
+ Test.msSQLServerDriver,
+ Test.msSQLServerContainer,
+ Test.oracleDriver,
+ Test.oracleDbContainer,
+ Test.logback)
val cassandra =
deps ++= Seq(
Compile.connectorsCassandra,
Compile.pekkoPersistenceQuery,
- Test.pekkoTypedTestkit % "test",
- Test.logback % "test",
- Test.cassandraContainer % "test",
- Test.scalatest % "test",
- Test.scalatestJUnit % "test")
+ Test.pekkoTypedTestkit,
+ Test.logback,
+ Test.cassandraContainer,
+ Test.scalatest,
+ Test.scalatestJUnit)
val kafka =
deps ++= Seq(
Compile.connectorsKafka,
Compile.jackson,
- Test.scalatest % "test",
- Test.logback % "test")
+ Test.scalatest,
+ Test.logback)
val grpc =
deps ++= Seq(
@@ -225,62 +219,62 @@ object Dependencies {
val grpcTest =
deps ++= Seq(
- "org.apache.pekko" %% "pekko-projection-r2dbc" %
Versions.pekkoPersistenceR2dbc % "test",
- Test.postgresDriver % "test",
- Compile.pekkoClusterShardingTyped % "test",
+ Test.postgresDriver,
+ Test.pekkoClusterShardingTyped,
Test.pekkoSerializationJackson,
Test.pekkoDiscovery,
- Test.pekkoTypedTestkit % "test",
- Test.pekkoStreamTestkit % "test",
- Test.postgresContainer % "test",
- Test.logback % "test",
- Test.scalatest % "test")
+ Test.pekkoTypedTestkit,
+ Test.pekkoStreamTestkit,
+ Test.postgresContainer,
+ Test.logback,
+ Test.scalatest)
val grpcIntTest =
deps ++= Seq(
- Compile.pekkoClusterShardingTyped % "test",
- Test.postgresDriver % "test",
+ Test.pekkoClusterShardingTyped,
+ Test.postgresDriver,
Test.pekkoSerializationJackson,
Test.pekkoDiscovery,
- Test.pekkoTypedTestkit % "test",
- Test.postgresContainer % "test",
+ Test.pekkoTypedTestkit,
+ Test.postgresContainer,
Test.r2dbcPostgres,
- Test.logback % "test",
- Test.scalatest % "test")
+ Test.logback,
+ Test.scalatest)
val r2dbc =
deps ++= Seq(
- "org.apache.pekko" %% "pekko-persistence-r2dbc" %
Versions.pekkoPersistenceR2dbc,
+ Compile.pekkoPersistenceR2dbc,
Compile.pekkoPersistenceQuery,
Compile.r2dbcSpi,
Compile.r2dbcPool,
Compile.r2dbcPostgres % "provided",
- Compile.r2dbcMysql % "provided")
+ Compile.r2dbcMysql % "provided",
+ Test.pekkoClusterShardingTyped)
val r2dbcIntTest =
deps ++= Seq(
- "org.apache.pekko" %% "pekko-persistence-r2dbc" %
Versions.pekkoPersistenceR2dbc,
+ Compile.pekkoPersistenceR2dbc,
Compile.pekkoPersistenceQuery,
Compile.r2dbcSpi,
Compile.r2dbcPool,
- Compile.r2dbcPostgres % "test",
- Compile.r2dbcMysql % "test",
+ Test.r2dbcPostgres,
+ Test.r2dbcMysql,
Test.pekkoSerializationJackson,
Test.pekkoDiscovery,
Test.pekkoDistributedData,
- Test.pekkoTypedTestkit % "test",
- Test.pekkoStreamTestkit % "test",
- Test.logback % "test",
- Test.scalatest % "test")
+ Test.pekkoTypedTestkit,
+ Test.pekkoStreamTestkit,
+ Test.logback,
+ Test.scalatest)
val kafkaTest =
deps ++= Seq(
- Test.scalatest % "test",
- Test.pekkoTypedTestkit % "test",
- Test.pekkoStreamTestkit % "test",
- Test.connectorsKafkaTestkit % "test",
- Test.logback % "test",
- Test.scalatestJUnit % "test")
+ Test.scalatest,
+ Test.pekkoTypedTestkit,
+ Test.pekkoStreamTestkit,
+ Test.connectorsKafkaTestkit,
+ Test.logback,
+ Test.scalatestJUnit)
val examples =
deps ++= Seq(
@@ -289,8 +283,8 @@ object Dependencies {
Examples.pekkoPersistenceCassandra,
Examples.pekkoPersistenceJdbc,
Examples.hibernate,
- Test.h2Driver % "test",
- Test.pekkoTypedTestkit % "test",
- Test.logback % "test",
- Test.cassandraContainer % "test")
+ Test.h2Driver,
+ Test.pekkoTypedTestkit,
+ Test.logback,
+ Test.cassandraContainer)
}
diff --git a/ddl-scripts/create_tables_postgres.sql
b/r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
similarity index 100%
copy from ddl-scripts/create_tables_postgres.sql
copy to r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
diff --git a/ddl-scripts/create_tables_postgres.sql
b/r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql
similarity index 82%
rename from ddl-scripts/create_tables_postgres.sql
rename to r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql
index ebb91c6..af5528c 100644
--- a/ddl-scripts/create_tables_postgres.sql
+++ b/r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql
@@ -18,11 +18,12 @@ CREATE TABLE IF NOT EXISTS event_journal(
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,
- PRIMARY KEY(persistence_id, seq_nr)
+ PRIMARY KEY(persistence_id HASH, seq_nr ASC)
);
-- `event_journal_slice_idx` is only needed if the slice based queries are used
-CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice,
entity_type, db_timestamp, seq_nr);
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice ASC,
entity_type ASC, db_timestamp ASC, seq_nr ASC, persistence_id, deleted)
+ SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895));
CREATE TABLE IF NOT EXISTS snapshot(
slice INT NOT NULL,
@@ -37,7 +38,7 @@ CREATE TABLE IF NOT EXISTS snapshot(
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,
- PRIMARY KEY(persistence_id)
+ PRIMARY KEY(persistence_id HASH)
);
CREATE TABLE IF NOT EXISTS durable_state (
@@ -52,11 +53,12 @@ CREATE TABLE IF NOT EXISTS durable_state (
state_payload BYTEA NOT NULL,
tags TEXT ARRAY,
- PRIMARY KEY(persistence_id, revision)
+ PRIMARY KEY(persistence_id HASH, revision ASC)
);
-- `durable_state_slice_idx` is only needed if the slice based queries are used
-CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice,
entity_type, db_timestamp, revision);
+CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice ASC,
entity_type ASC, db_timestamp ASC, revision ASC, persistence_id)
+ SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895));
-- Primitive offset types are stored in this table.
-- If only timestamp based offsets are used this table is optional.
@@ -72,6 +74,7 @@ CREATE TABLE IF NOT EXISTS projection_offset_store (
);
-- Timestamp based offsets are stored in this table.
+
CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
@@ -83,8 +86,8 @@ CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
- PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
-);
+ PRIMARY KEY(slice ASC, projection_name ASC, timestamp_offset ASC,
persistence_id ASC, seq_nr ASC)
+) SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895));
CREATE TABLE IF NOT EXISTS projection_management (
projection_name VARCHAR(255) NOT NULL,
diff --git
a/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java
b/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java
new file mode 100644
index 0000000..84a61c9
--- /dev/null
+++ b/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.projection;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.persistence.query.Offset;
+import org.apache.pekko.persistence.r2dbc.query.javadsl.R2dbcReadJournal;
+import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
+import org.apache.pekko.projection.javadsl.SourceProvider;
+
+import docs.home.CborSerializable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+// #handler
+// #grouped-handler
+import org.apache.pekko.projection.r2dbc.javadsl.R2dbcHandler;
+import org.apache.pekko.projection.r2dbc.javadsl.R2dbcSession;
+import io.r2dbc.spi.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+// #grouped-handler
+// #handler
+
+// #initProjections
+import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
+import org.apache.pekko.projection.ProjectionBehavior;
+import org.apache.pekko.persistence.query.typed.EventEnvelope;
+import org.apache.pekko.projection.Projection;
+// #initProjections
+
+// #exactlyOnce
+// #atLeastOnce
+// #grouped
+// #initProjections
+import org.apache.pekko.projection.ProjectionId;
+import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings;
+import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection;
+
+// #initProjections
+// #grouped
+// #atLeastOnce
+// #exactlyOnce
+
+@SuppressWarnings({"unused", "InnerClassMayBeStatic"})
+class R2dbcProjectionDocExample {
+
+ static class ShoppingCart {
+ public static EntityTypeKey<Command> ENTITY_TYPE_KEY =
+ EntityTypeKey.create(Command.class, "ShoppingCart");
+
+ interface Command extends CborSerializable {}
+
+ interface Event {
+ String getCartId();
+ }
+
+ public static class CheckedOut implements Event {
+
+ public final String cartId;
+ public final Instant eventTime;
+
+ public CheckedOut(String cartId, Instant eventTime) {
+ this.cartId = cartId;
+ this.eventTime = eventTime;
+ }
+
+ public String getCartId() {
+ return cartId;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckedOut(" + cartId + "," + eventTime + ")";
+ }
+ }
+ }
+
+ // #handler
+ public class ShoppingCartHandler extends
R2dbcHandler<EventEnvelope<ShoppingCart.Event>> {
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public CompletionStage<Done> process(
+ R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) {
+ ShoppingCart.Event event = envelope.event();
+ if (event instanceof ShoppingCart.CheckedOut) {
+ ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
+ logger.info(
+ "Shopping cart {} was checked out at {}", checkedOut.cartId,
checkedOut.eventTime);
+
+ Statement stmt =
+ session
+ .createStatement("INSERT into order (id, time) VALUES ($1,
$2)")
+ .bind(0, checkedOut.cartId)
+ .bind(1, checkedOut.eventTime);
+ return session.updateOne(stmt).thenApply(rowsUpdated ->
Done.getInstance());
+
+ } else {
+ logger.debug("Shopping cart {} changed by {}", event.getCartId(),
event);
+ return CompletableFuture.completedFuture(Done.getInstance());
+ }
+ }
+ }
+
+ // #handler
+
+ // #grouped-handler
+ public class GroupedShoppingCartHandler
+ extends R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>> {
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public CompletionStage<Done> process(
+ R2dbcSession session, List<EventEnvelope<ShoppingCart.Event>>
envelopes) {
+ List<Statement> stmts = new ArrayList<>();
+ for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) {
+ ShoppingCart.Event event = envelope.event();
+ if (event instanceof ShoppingCart.CheckedOut) {
+ ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
+ logger.info(
+ "Shopping cart {} was checked out at {}", checkedOut.cartId,
checkedOut.eventTime);
+
+ Statement stmt =
+ session
+ .createStatement("INSERT into order (id, time) VALUES ($1,
$2)")
+ .bind(0, checkedOut.cartId)
+ .bind(1, checkedOut.eventTime);
+ stmts.add(stmt);
+ } else {
+ logger.debug("Shopping cart {} changed by {}", event.getCartId(),
event);
+ }
+ }
+
+ return session.update(stmts).thenApply(rowsUpdated ->
Done.getInstance());
+ }
+ }
+
+ // #grouped-handler
+
+ ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
+
+ // #initProjections
+ void initProjections() {
+ // Split the slices into 4 ranges
+ int numberOfSliceRanges = 4;
+ List<Pair<Integer, Integer>> sliceRanges =
+ EventSourcedProvider.sliceRanges(
+ system, R2dbcReadJournal.Identifier(), numberOfSliceRanges);
+
+ ShardedDaemonProcess.get(system)
+ .init(
+ ProjectionBehavior.Command.class,
+ "ShoppingCartProjection",
+ sliceRanges.size(),
+ i ->
ProjectionBehavior.create(createProjection(sliceRanges.get(i))),
+ ProjectionBehavior.stopMessage());
+ }
+
+ Projection<EventEnvelope<ShoppingCart.Event>> createProjection(
+ Pair<Integer, Integer> sliceRange) {
+ int minSlice = sliceRange.first();
+ int maxSlice = sliceRange.second();
+
+ String entityType = ShoppingCart.ENTITY_TYPE_KEY.name();
+
+ SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
+ EventSourcedProvider.eventsBySlices(
+ system, R2dbcReadJournal.Identifier(), entityType, minSlice,
maxSlice);
+
+ ProjectionId projectionId =
+ ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+ Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+ return R2dbcProjection.exactlyOnce(
+ projectionId, settings, sourceProvider, ShoppingCartHandler::new,
system);
+ }
+
+ // #initProjections
+
+ // #sourceProvider
+ // Split the slices into 4 ranges
+ int numberOfSliceRanges = 4;
+ List<Pair<Integer, Integer>> sliceRanges =
+ EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier(),
numberOfSliceRanges);
+
+ // Example of using the first slice range
+ int minSlice = sliceRanges.get(0).first();
+ int maxSlice = sliceRanges.get(0).second();
+ String entityType = ShoppingCart.ENTITY_TYPE_KEY.name();
+
+ SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
+ EventSourcedProvider.eventsBySlices(
+ system, R2dbcReadJournal.Identifier(), entityType, minSlice,
maxSlice);
+
+ // #sourceProvider
+
+ {
+ // #exactlyOnce
+ ProjectionId projectionId =
+ ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+ Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+ Projection<EventEnvelope<ShoppingCart.Event>> projection =
+ R2dbcProjection.exactlyOnce(
+ projectionId, settings, sourceProvider, ShoppingCartHandler::new,
system);
+ // #exactlyOnce
+ }
+
+ {
+ // #atLeastOnce
+ ProjectionId projectionId =
+ ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+ Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+ int saveOffsetAfterEnvelopes = 100;
+ Duration saveOffsetAfterDuration = Duration.ofMillis(500);
+
+ Projection<EventEnvelope<ShoppingCart.Event>> projection =
+ R2dbcProjection.atLeastOnce(
+ projectionId, settings, sourceProvider,
ShoppingCartHandler::new, system)
+ .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
+ // #atLeastOnce
+ }
+
+ {
+ // #grouped
+ ProjectionId projectionId =
+ ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+ Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+ int saveOffsetAfterEnvelopes = 100;
+ Duration saveOffsetAfterDuration = Duration.ofMillis(500);
+
+ Projection<EventEnvelope<ShoppingCart.Event>> projection =
+ R2dbcProjection.groupedWithin(
+ projectionId, settings, sourceProvider,
GroupedShoppingCartHandler::new, system)
+ .withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
+ // #grouped
+ }
+
+ {
+ // #projectionSettings
+ ProjectionId projectionId =
+ ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+ Optional<R2dbcProjectionSettings> settings =
+ Optional.of(
+ R2dbcProjectionSettings.create(
+
system.settings().config().getConfig("second-projection-r2dbc")));
+
+ Projection<EventEnvelope<ShoppingCart.Event>> projection =
+ R2dbcProjection.atLeastOnce(
+ projectionId, settings, sourceProvider, ShoppingCartHandler::new,
system);
+ // #projectionSettings
+ }
+}
diff --git a/r2dbc/src/test/scala/docs/home/CborSerializable.scala
b/r2dbc/src/test/scala/docs/home/CborSerializable.scala
new file mode 100644
index 0000000..3ccbac4
--- /dev/null
+++ b/r2dbc/src/test/scala/docs/home/CborSerializable.scala
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home
+
+trait CborSerializable
diff --git
a/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
b/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
new file mode 100644
index 0000000..2e0a171
--- /dev/null
+++ b/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.projection
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.cluster.sharding.typed.scaladsl.EntityTypeKey
+import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
+import pekko.persistence.query.Offset
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import docs.home.CborSerializable
+import org.slf4j.LoggerFactory
+
+import java.time.Instant
+import scala.annotation.nowarn
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+//#handler
+//#grouped-handler
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.projection.r2dbc.scaladsl.R2dbcHandler
+import pekko.projection.r2dbc.scaladsl.R2dbcSession
+
+//#grouped-handler
+//#handler
+object R2dbcProjectionDocExample {
+
+ object ShoppingCart {
+ val EntityKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("ShoppingCart")
+
+ sealed trait Command extends CborSerializable
+
+ sealed trait Event extends CborSerializable {
+ def cartId: String
+ }
+
+ final case class ItemAdded(cartId: String, itemId: String, quantity: Int)
extends Event
+ final case class ItemRemoved(cartId: String, itemId: String) extends Event
+ final case class ItemQuantityAdjusted(cartId: String, itemId: String,
newQuantity: Int) extends Event
+ final case class CheckedOut(cartId: String, eventTime: Instant) extends
Event
+ }
+
+ // #handler
+ class ShoppingCartHandler()(implicit ec: ExecutionContext) extends
R2dbcHandler[EventEnvelope[ShoppingCart.Event]] {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ override def process(session: R2dbcSession, envelope:
EventEnvelope[ShoppingCart.Event]): Future[Done] = {
+ envelope.event match {
+ case ShoppingCart.CheckedOut(cartId, time) =>
+ logger.info(s"Shopping cart $cartId was checked out at $time")
+ val stmt = session
+ .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
+ .bind(0, cartId)
+ .bind(1, time)
+ session
+ .updateOne(stmt)
+ .map(_ => Done)
+
+ case otherEvent =>
+ logger.debug(s"Shopping cart ${otherEvent.cartId} changed by
$otherEvent")
+ Future.successful(Done)
+ }
+ }
+ }
+ // #handler
+
+ // #grouped-handler
+ import scala.collection.immutable
+
+ class GroupedShoppingCartHandler()(implicit ec: ExecutionContext)
+ extends R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ override def process(
+ session: R2dbcSession,
+ envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]):
Future[Done] = {
+
+ // save all events in DB
+ val stmts = envelopes
+ .map(_.event)
+ .collect {
+ case ShoppingCart.CheckedOut(cartId, time) =>
+ logger.info(s"Shopping cart $cartId was checked out at $time")
+
+ session
+ .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
+ .bind(0, cartId)
+ .bind(1, time)
+
+ }
+ .toVector
+
+ session.update(stmts).map(_ => Done)
+ }
+ }
+ // #grouped-handler
+
+ implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty,
"Example")
+ implicit val ec: ExecutionContext = system.executionContext
+
+ object IllustrateInit {
+ // #initProjections
+ import pekko.persistence.query.typed.EventEnvelope
+ import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+ import pekko.projection.Projection
+ import pekko.projection.ProjectionBehavior
+ import pekko.projection.ProjectionId
+ import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+ import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+ import pekko.projection.scaladsl.SourceProvider
+
+ def initProjections(): Unit = {
+ def sourceProvider(sliceRange: Range): SourceProvider[Offset,
EventEnvelope[ShoppingCart.Event]] =
+ EventSourcedProvider
+ .eventsBySlices[ShoppingCart.Event](
+ system,
+ readJournalPluginId = R2dbcReadJournal.Identifier,
+ entityType,
+ sliceRange.min,
+ sliceRange.max)
+
+ def projection(sliceRange: Range):
Projection[EventEnvelope[ShoppingCart.Event]] = {
+ val minSlice = sliceRange.min
+ val maxSlice = sliceRange.max
+ val projectionId = ProjectionId("ShoppingCarts",
s"carts-$minSlice-$maxSlice")
+
+ R2dbcProjection
+ .exactlyOnce(
+ projectionId,
+ settings = None,
+ sourceProvider(sliceRange),
+ handler = () => new ShoppingCartHandler)
+ }
+
+ // Split the slices into 4 ranges
+ val numberOfSliceRanges: Int = 4
+ val sliceRanges = EventSourcedProvider.sliceRanges(system,
R2dbcReadJournal.Identifier, numberOfSliceRanges)
+
+ ShardedDaemonProcess(system).init(
+ name = "ShoppingCartProjection",
+ numberOfInstances = sliceRanges.size,
+ behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))),
+ stopMessage = ProjectionBehavior.Stop)
+ }
+ // #initProjections
+ }
+
+ // #sourceProvider
+ import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+ import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+ import pekko.projection.scaladsl.SourceProvider
+
+ // Slit the slices into 4 ranges
+ val numberOfSliceRanges: Int = 4
+ val sliceRanges = EventSourcedProvider.sliceRanges(system,
R2dbcReadJournal.Identifier, numberOfSliceRanges)
+
+ // Example of using the first slice range
+ val minSlice: Int = sliceRanges.head.min
+ val maxSlice: Int = sliceRanges.head.max
+ val entityType: String = ShoppingCart.EntityKey.name
+
+ val sourceProvider: SourceProvider[Offset,
EventEnvelope[ShoppingCart.Event]] =
+ EventSourcedProvider
+ .eventsBySlices[ShoppingCart.Event](
+ system,
+ readJournalPluginId = R2dbcReadJournal.Identifier,
+ entityType,
+ minSlice,
+ maxSlice)
+ // #sourceProvider
+
+ object IllustrateExactlyOnce {
+ // #exactlyOnce
+ import pekko.projection.ProjectionId
+ import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+ val projectionId = ProjectionId("ShoppingCarts",
s"carts-$minSlice-$maxSlice")
+
+ val projection =
+ R2dbcProjection
+ .exactlyOnce(projectionId, settings = None, sourceProvider, handler =
() => new ShoppingCartHandler)
+ // #exactlyOnce
+ }
+
+ object IllustrateAtLeastOnce {
+ // #atLeastOnce
+ import pekko.projection.ProjectionId
+ import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+ val projectionId = ProjectionId("ShoppingCarts",
s"carts-$minSlice-$maxSlice")
+
+ val projection =
+ R2dbcProjection
+ .atLeastOnce(projectionId, settings = None, sourceProvider, handler =
() => new ShoppingCartHandler)
+ .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
+ // #atLeastOnce
+ }
+
+ object IllustrateGrouped {
+ // #grouped
+ import pekko.projection.ProjectionId
+ import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+ val projectionId = ProjectionId("ShoppingCarts",
s"carts-$minSlice-$maxSlice")
+
+ val projection =
+ R2dbcProjection
+ .groupedWithin(projectionId, settings = None, sourceProvider, handler
= () => new GroupedShoppingCartHandler)
+ .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
+ // #grouped
+ }
+
+ // Ignore Scala 2.13 compiler warning
+ @nowarn("msg=possible missing interpolator")
+ object IllustrateSettings {
+ val config =
+ """
+ // #second-projection-config
+ second-projection-r2dbc = ${pekko.projection.r2dbc}
+ second-projection-r2dbc {
+ offset-store {
+ # specific projection offset store properties here
+ }
+ use-connection-factory = "second-r2dbc.connection-factory"
+ }
+ // #second-projection-config
+
+ // #second-projection-config-with-connection-factory
+ second-projection-r2dbc = ${pekko.projection.r2dbc}
+ second-projection-r2dbc {
+ connection-factory = ${pekko.persistence.r2dbc.connection-factory}
+ connection-factory {
+ # specific connection properties for offset store and projection
handler here
+ }
+
+ offset-store {
+ # specific projection offset store properties here
+ }
+ use-connection-factory = "second-projection-r2dbc.connection-factory"
+ }
+ // #second-projection-config-with-connection-factory
+ """
+
+ // #projectionSettings
+
+ import pekko.projection.ProjectionId
+ import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+ val projectionId = ProjectionId("ShoppingCarts",
s"carts-$minSlice-$maxSlice")
+
+ val settings =
Some(R2dbcProjectionSettings(system.settings.config.getConfig("second-projection-r2dbc")))
+
+ val projection =
+ R2dbcProjection
+ .atLeastOnce(projectionId, settings = None, sourceProvider, handler =
() => new ShoppingCartHandler)
+ // #projectionSettings
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]