This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new a593069 Port akka-persistence-r2dbc PR#239: add JSONB/JSON payload
column type support (#355)
a593069 is described below
commit a593069f371a7fe3c40a1815476021fc96b472d2
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 18:11:41 2026 +0100
Port akka-persistence-r2dbc PR#239: add JSONB/JSON payload column type
support (#355)
* Port akka-persistence-r2dbc PR#239: add JSONB/JSON payload column type
support
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/2b268242-2841-4d1d-8983-0a15eca8890a
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* Port akka-persistence-r2dbc PR#388: nonePayload, bindPayloadOption,
PayloadSpec test
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/6ce25a3f-8305-4ade-bbda-e17e51fe134d
Co-authored-by: pjfanning <[email protected]>
* Fix PayloadSpec: remove invalid row1 assertion for delete-before-persist
case
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/66a2b6dc-2582-4911-8428-ef2187f45089
Co-authored-by: pjfanning <[email protected]>
* Fix PayloadSpec: restructure store delete marker test to persist-first to
avoid Pekko 1.3.0 intermittent failure
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/6db8d3b2-eac7-413b-9f05-3e4d313fa6f9
Co-authored-by: pjfanning <[email protected]>
* Update build-test.yml
* Update build-test.yml
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.github/workflows/build-test.yml | 47 ++++
core/src/main/resources/reference.conf | 11 +
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 22 ++
.../r2dbc/internal/EventsByPersistenceIdDao.scala | 6 +-
.../persistence/r2dbc/internal/PayloadCodec.scala | 63 ++++++
.../persistence/r2dbc/journal/JournalDao.scala | 7 +-
.../r2dbc/query/scaladsl/QueryDao.scala | 7 +-
.../persistence/r2dbc/snapshot/SnapshotDao.scala | 42 ++--
.../r2dbc/state/scaladsl/DurableStateDao.scala | 17 +-
.../pekko/persistence/r2dbc/JsonSerializable.scala | 16 ++
.../pekko/persistence/r2dbc/PayloadSpec.scala | 252 +++++++++++++++++++++
.../pekko/persistence/r2dbc/TestActors.scala | 6 +-
.../pekko/persistence/r2dbc/TestConfig.scala | 1 +
.../r2dbc/journal/PersistTimestampSpec.scala | 5 +-
.../query/EventsBySliceBacktrackingSpec.scala | 5 +-
ddl-scripts/create_tables_postgres_jsonb.sql | 97 ++++++++
.../r2dbc/EventSourcedEndToEndSpec.scala | 6 +-
17 files changed, 575 insertions(+), 35 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index d00af47..4914c36 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -90,6 +90,53 @@ jobs:
- name: test
run: sbt ++${{ matrix.SCALA_VERSION }} test
+ test-postgres-jsonb:
+ name: Run test with Postgres (JSONB)
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ SCALA_VERSION: [ 2.13, 3.3 ]
+ JAVA_VERSION: [ 17 ]
+ if: github.repository == 'apache/pekko-persistence-r2dbc'
+ steps:
+ - name: Checkout
+ uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd #
v6.0.2
+ with:
+ fetch-depth: 0
+ fetch-tags: true
+
+ - name: Checkout GitHub merge
+ if: github.event.pull_request
+ run: |-
+ git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
+ git checkout scratch
+
+ - name: Setup Java ${{ matrix.JAVA_VERSION }}
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 #
v5.2.0
+ with:
+ distribution: temurin
+ java-version: ${{ matrix.JAVA_VERSION }}
+
+ - name: Install sbt
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
6.4.8.1.0
+
+ - name: Enable jvm-opts
+ run: cp .jvmopts-ci .jvmopts
+
+ - name: Start DB
+ run: |-
+ docker compose -f docker/docker-compose-postgres.yml up -d
+ # TODO: could we poll the port instead of sleep?
+ sleep 10
+ docker exec -i docker-postgres-db-1 psql -U postgres -t <
ddl-scripts/create_tables_postgres_jsonb.sql
+
+ - name: test
+ run: |-
+ sbt -Dpekko.persistence.r2dbc.journal.payload-column-type=JSONB
-Dpekko.persistence.r2dbc.snapshot.payload-column-type=JSONB
-Dpekko.persistence.r2dbc.state.payload-column-type=JSONB "core/testOnly
*persistence.r2dbc.PayloadSpec"
+
test-yugabyte:
name: Run tests with Yugabyte
runs-on: ubuntu-latest
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index dac6e01..ecd8a7f 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -8,6 +8,9 @@ pekko.persistence.r2dbc {
# name of the table to use for events
table = "event_journal"
+ # the column type to use for event payloads (BYTEA, JSON or JSONB)
+ payload-column-type = "BYTEA"
+
# Otherwise it would be a pinned dispatcher, see
https://github.com/akka/akka/issues/31058
plugin-dispatcher = "pekko.actor.default-dispatcher"
@@ -59,6 +62,9 @@ pekko.persistence.r2dbc {
class = "org.apache.pekko.persistence.r2dbc.snapshot.R2dbcSnapshotStore"
table = "snapshot"
+ # the column type to use for snapshot payloads (BYTEA, JSON or JSONB)
+ payload-column-type = "BYTEA"
+
# Otherwise it would be a pinned dispatcher, see
https://github.com/akka/akka/issues/31058
plugin-dispatcher = "pekko.actor.default-dispatcher"
@@ -81,6 +87,9 @@ pekko.persistence.r2dbc {
table = "durable_state"
+ # the column type to use for durable state payloads (BYTEA, JSON or JSONB)
+ payload-column-type = "BYTEA"
+
# When this is enabled the updates verifies that the revision is +1 of
# previous revision. There might be a small performance gain if
# this is disabled.
@@ -121,6 +130,8 @@ pekko.persistence.r2dbc {
table = ${pekko.persistence.r2dbc.journal.table}
+ payload-column-type =
${pekko.persistence.r2dbc.journal.payload-column-type}
+
publish-events = ${pekko.persistence.r2dbc.journal.publish-events}
# When journal publish-events is enabled a best effort deduplication can
be enabled by setting
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index ff1a234..93402a6 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -21,6 +21,7 @@ import scala.jdk.DurationConverters._
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.annotation.InternalStableApi
+import pekko.persistence.r2dbc.internal.PayloadCodec
import pekko.util.Helpers.toRootLowerCase
import com.typesafe.config.Config
@@ -63,6 +64,9 @@ final class JournalSettings(val config: Config) extends
ConnectionSettings with
val journalTable: String = config.getString("table")
val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
journalTable
+
+ val journalPayloadCodec: PayloadCodec =
+ if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else
PayloadCodec.ByteArrayCodec
}
/**
@@ -82,6 +86,9 @@ final class SnapshotSettings(val config: Config) extends
ConnectionSettings with
val snapshotsTable: String = config.getString("table")
val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
snapshotsTable
+
+ val snapshotPayloadCodec: PayloadCodec =
+ if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else
PayloadCodec.ByteArrayCodec
}
/**
@@ -105,6 +112,9 @@ final class StateSettings(val config: Config) extends
ConnectionSettings with Us
val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("")
+ durableStateTable
val durableStateAssertSingleWriter: Boolean =
config.getBoolean("assert-single-writer")
+
+ val durableStatePayloadCodec: PayloadCodec =
+ if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else
PayloadCodec.ByteArrayCodec
}
/**
@@ -127,6 +137,9 @@ final class QuerySettings(val config: Config) extends
ConnectionSettings with Us
val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
journalTable
val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
+
+ val journalPayloadCodec: PayloadCodec =
+ if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else
PayloadCodec.ByteArrayCodec
}
/**
@@ -154,6 +167,15 @@ trait ConnectionSettings {
case "off" => -1.millis
case _ => config.getDuration("log-db-calls-exceeding").toScala
}
+
+ protected def useJsonPayload(configKey: String): Boolean =
+ config.getString(configKey).toUpperCase match {
+ case "BYTEA" => false
+ case "JSONB" | "JSON" => true
+ case t =>
+ throw new IllegalStateException(
+ s"Expected $configKey to be one of 'BYTEA', 'JSON' or 'JSONB' but
found '$t'")
+ }
}
/**
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
index 3664e92..3862ce0 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -20,6 +20,8 @@ import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.BufferSize
import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.r2dbc.journal.JournalDao.readMetadata
@@ -61,6 +63,8 @@ private[r2dbc] trait EventsByPersistenceIdDao {
protected def settings: BufferSize
+ implicit protected def journalPayloadCodec: PayloadCodec
+
private lazy val selectEventsSql = sql"""
SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload, tags
from $journalTable
@@ -141,7 +145,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
- payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+ payload = Some(row.getPayload("event_payload")),
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = row.get("writer", classOf[String]),
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PayloadCodec.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PayloadCodec.scala
new file mode 100644
index 0000000..f15f441
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PayloadCodec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.pekko.annotation.InternalApi
+import io.r2dbc.postgresql.codec.Json
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed trait PayloadCodec {
+ def payloadClass: Class[_]
+ def encode(bytes: Array[Byte]): Any
+ def decode(payload: Any): Array[Byte]
+ def nonePayload: Array[Byte]
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object PayloadCodec {
+ case object ByteArrayCodec extends PayloadCodec {
+ override def payloadClass: Class[Array[Byte]] = classOf[Array[Byte]]
+ override def encode(bytes: Array[Byte]): Array[Byte] = bytes
+ override def decode(payload: Any): Array[Byte] =
payload.asInstanceOf[Array[Byte]]
+ override def nonePayload: Array[Byte] = Array.emptyByteArray
+ }
+ case object JsonCodec extends PayloadCodec {
+ override def payloadClass: Class[Json] = classOf[Json]
+ override def encode(bytes: Array[Byte]): Json = Json.of(bytes)
+ override def decode(payload: Any): Array[Byte] =
+ if (payload == null) null else payload.asInstanceOf[Json].asArray()
+ override val nonePayload: Array[Byte] = "{}".getBytes(UTF_8)
+ }
+ implicit class RichStatement(val statement: Statement)(implicit codec:
PayloadCodec) extends AnyRef {
+ def bindPayload(index: Int, payload: Array[Byte]): Statement =
+ statement.bind(index, codec.encode(payload))
+
+ def bindPayloadOption(index: Int, payloadOption: Option[Array[Byte]]):
Statement =
+ payloadOption match {
+ case Some(payload) => bindPayload(index, payload)
+ case None => bindPayload(index, codec.nonePayload)
+ }
+ }
+ implicit class RichRow(val row: Row)(implicit codec: PayloadCodec) extends
AnyRef {
+ def getPayload(name: String): Array[Byte] = codec.decode(row.get(name,
codec.payloadClass))
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index c61d154..5be1579 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -27,6 +27,8 @@ import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
@@ -112,6 +114,7 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
protected val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
protected val journalTable: String = settings.journalTableWithSchema
+ protected implicit val journalPayloadCodec: PayloadCodec =
settings.journalPayloadCodec
protected val (insertEventWithParameterTimestampSql: String,
insertEventWithTransactionTimestampSql: String) = {
val baseSql =
@@ -189,7 +192,7 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
.bind(5, "") // FIXME event adapter
.bind(6, write.serId)
.bind(7, write.serManifest)
- .bind(8, write.payload.get)
+ .bindPayload(8, write.payload.get)
if (write.tags.isEmpty)
stmt.bindNull(9, classOf[Array[String]])
@@ -290,7 +293,7 @@ private[r2dbc] class JournalDao(val settings:
JournalSettings, connectionFactory
.bind(5, "")
.bind(6, 0)
.bind(7, "")
- .bind(8, Array.emptyByteArray)
+ .bindPayloadOption(8, None)
.bind(9, true)
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index b79f7ac..c23bcff 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -32,6 +32,8 @@ import
pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.persistence.r2dbc.internal.InstantFactory
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
@@ -82,6 +84,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
protected lazy val statementTimestampSql: String = "statement_timestamp()"
protected val journalTable = settings.journalTableWithSchema
+ protected implicit val journalPayloadCodec: PayloadCodec =
settings.journalPayloadCodec
private val currentDbTimestampSql =
"SELECT transaction_timestamp() AS db_timestamp"
@@ -224,7 +227,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
- payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+ payload = Some(row.getPayload("event_payload")),
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
@@ -310,7 +313,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings,
connectionFactory: Co
seqNr,
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
- payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+ payload = Some(row.getPayload("event_payload")),
serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 7cf77dc..de0cdf3 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -23,6 +23,9 @@ import pekko.persistence.SnapshotSelectionCriteria
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.SnapshotSettings
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
@@ -50,24 +53,6 @@ private[r2dbc] object SnapshotDao {
final case class SerializedSnapshotMetadata(payload: Array[Byte],
serializerId: Int, serializerManifest: String)
- private def collectSerializedSnapshot(row: Row): SerializedSnapshotRow =
- SerializedSnapshotRow(
- row.get("persistence_id", classOf[String]),
- row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
- row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]),
- row.get("snapshot", classOf[Array[Byte]]),
- row.get[Integer]("ser_id", classOf[Integer]),
- row.get("ser_manifest", classOf[String]), {
- val metaSerializerId = row.get[Integer]("meta_ser_id",
classOf[Integer])
- if (metaSerializerId eq null) None
- else
- Some(
- SerializedSnapshotMetadata(
- row.get("meta_payload", classOf[Array[Byte]]),
- metaSerializerId,
- row.get("meta_ser_manifest", classOf[String])))
- })
-
def fromConfig(
settings: SnapshotSettings,
config: Config
@@ -100,6 +85,25 @@ private[r2dbc] class SnapshotDao(settings:
SnapshotSettings, connectionFactory:
protected val snapshotTable: String = settings.snapshotsTableWithSchema
private val persistenceExt = Persistence(system)
private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
+ protected implicit val snapshotPayloadCodec: PayloadCodec =
settings.snapshotPayloadCodec
+
+ private def collectSerializedSnapshot(row: Row): SerializedSnapshotRow =
+ SerializedSnapshotRow(
+ row.get("persistence_id", classOf[String]),
+ row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+ row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]),
+ row.getPayload("snapshot"),
+ row.get[Integer]("ser_id", classOf[Integer]),
+ row.get("ser_manifest", classOf[String]), {
+ val metaSerializerId = row.get[Integer]("meta_ser_id",
classOf[Integer])
+ if (metaSerializerId eq null) None
+ else
+ Some(
+ SerializedSnapshotMetadata(
+ row.get("meta_payload", classOf[Array[Byte]]),
+ metaSerializerId,
+ row.get("meta_ser_manifest", classOf[String])))
+ })
protected val upsertSql = sql"""
INSERT INTO $snapshotTable
@@ -211,7 +215,7 @@ private[r2dbc] class SnapshotDao(settings:
SnapshotSettings, connectionFactory:
.bind(2, serializedRow.persistenceId)
.bind(3, serializedRow.seqNr)
.bind(4, serializedRow.writeTimestamp)
- .bind(5, serializedRow.snapshot)
+ .bindPayload(5, serializedRow.snapshot)
.bind(6, serializedRow.serializerId)
.bind(7, serializedRow.serializerManifest)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 2b988de..8b45992 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.persistence.r2dbc.state.scaladsl
import java.time.Instant
+import java.util
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.{ Duration, FiniteDuration }
@@ -31,6 +32,9 @@ import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.persistence.r2dbc.internal.InstantFactory
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
@@ -99,6 +103,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
protected val stateTable = settings.durableStateTableWithSchema
+ protected implicit val statePayloadCodec: PayloadCodec =
settings.durableStatePayloadCodec
private val selectStateSql: String = sql"""
SELECT revision, state_ser_id, state_ser_manifest, state_payload,
db_timestamp
@@ -222,8 +227,8 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
private def getPayload(row: Row): Option[Array[Byte]] = {
val serId = row.get("state_ser_id", classOf[Integer])
- val rowPayload = row.get("state_payload", classOf[Array[Byte]])
- if (serId == 0 && (rowPayload == null || rowPayload.isEmpty))
+ val rowPayload = row.getPayload("state_payload")
+ if (serId == 0 && (rowPayload == null ||
util.Arrays.equals(statePayloadCodec.nonePayload, rowPayload)))
None // delete marker
else
Option(rowPayload)
@@ -254,7 +259,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(3, state.revision)
.bind(4, state.serId)
.bind(5, state.serManifest)
- .bind(6, state.payload.getOrElse(Array.emptyByteArray))
+ .bindPayloadOption(6, state.payload)
bindTags(stmt, 7)
}
.recoverWith { case _: R2dbcDataIntegrityViolationException =>
@@ -271,7 +276,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(0, state.revision)
.bind(1, state.serId)
.bind(2, state.serManifest)
- .bind(3, state.payload.getOrElse(Array.emptyByteArray))
+ .bindPayloadOption(3, state.payload)
bindTags(stmt, 4)
if (settings.dbTimestampMonotonicIncreasing) {
@@ -347,7 +352,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(3, revision)
.bind(4, 0)
.bind(5, "")
- .bind(6, Array.emptyByteArray)
+ .bindPayloadOption(6, None)
.bindNull(7, classOf[Array[String]])
}
.recoverWith { case _: R2dbcDataIntegrityViolationException =>
@@ -363,7 +368,7 @@ private[r2dbc] class DurableStateDao(settings:
StateSettings, connectionFactory:
.bind(0, revision)
.bind(1, 0)
.bind(2, "")
- .bind(3, Array.emptyByteArray)
+ .bindPayloadOption(3, None)
if (settings.dbTimestampMonotonicIncreasing) {
if (settings.durableStateAssertSingleWriter)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/JsonSerializable.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/JsonSerializable.scala
new file mode 100644
index 0000000..77d7890
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/JsonSerializable.scala
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc
+
+trait JsonSerializable
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/PayloadSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/PayloadSpec.scala
new file mode 100644
index 0000000..1078856
--- /dev/null
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/PayloadSpec.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc
+
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestActors.DurableStatePersister
+import pekko.persistence.r2dbc.TestActors.Persister
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+/**
+ * The purpose of this test is to verify JSONB payloads, but it can also be
run with ordinary BYTEA payloads. To test
+ * with JSONB the db schema should be created with
`ddl-scripts/create_tables_postgres_jsonb.sql` and start `sbt` with
+ *
+ * {{{
+ * sbt -Dpekko.persistence.r2dbc.journal.payload-column-type=JSONB
-Dpekko.persistence.r2dbc.snapshot.payload-column-type=JSONB
-Dpekko.persistence.r2dbc.state.payload-column-type=JSONB
+ * }}}
+ *
+ * Note that other tests may fail with JSONB column type because the test data
isn't in json.
+ */
+object PayloadSpec {
+ val config = ConfigFactory
+ .parseString("""
+ pekko.serialization.jackson.jackson-json.compression.algorithm = off
+ """)
+ .withFallback(TestConfig.config)
+
+ final case class TestRow(pid: String, seqNr: Long, payload: Array[Byte])
+
+ final case class TestJson(a: String, i: Int) extends JsonSerializable
+
+ implicit class JsonString(val s: String) extends AnyVal {
+ // don't care about json formatting, which may be different for JSONB and
BYTEA
+ def clearWhitespace: String = {
+ val sb = new StringBuilder(s.length)
+ s.foreach { ch =>
+ if (!ch.isWhitespace)
+ sb.append(ch)
+ }
+ sb.result()
+ }
+ }
+}
+
+class PayloadSpec
+ extends ScalaTestWithActorTestKit(PayloadSpec.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with LogCapturing {
+ import PayloadSpec._
+
+ override def typedSystem: ActorSystem[_] = system
+
+ private def testJournalPersister(persistenceId: String, msg: Any): Unit = {
+ val probe = createTestProbe[Any]()
+ val ref1 = spawn(Persister(persistenceId))
+ ref1 ! Persister.PersistWithAck(msg, probe.ref)
+ probe.expectMessage(Done)
+ testKit.stop(ref1)
+
+ val ref2 = spawn(Persister(persistenceId))
+ ref2 ! Persister.GetState(probe.ref)
+ probe.receiveMessage().toString.clearWhitespace shouldBe
msg.toString.clearWhitespace
+ testKit.stop(ref2)
+ }
+
+ private def testDurableStatePersister(persistenceId: String, msg: Any): Unit
= {
+ val probe = createTestProbe[Any]()
+ val ref1 = spawn(DurableStatePersister(persistenceId))
+ ref1 ! DurableStatePersister.PersistWithAck(msg, probe.ref)
+ probe.expectMessage(Done)
+ testKit.stop(ref1)
+
+ val ref2 = spawn(DurableStatePersister(persistenceId))
+ ref2 ! DurableStatePersister.GetState(probe.ref)
+ probe.receiveMessage().toString.clearWhitespace shouldBe
msg.toString.clearWhitespace
+ testKit.stop(ref2)
+ }
+
+ private def selectJournalRow(persistenceId: String): TestRow = {
+ implicit val codec: PayloadCodec = journalSettings.journalPayloadCodec
+
+ r2dbcExecutor
+ .selectOne[TestRow]("test")(
+ connection =>
+ connection.createStatement(
+ s"select * from ${journalSettings.journalTableWithSchema} where
persistence_id = '$persistenceId'"),
+ row => {
+ val payload = row.getPayload("event_payload")
+ TestRow(
+ pid = row.get("persistence_id", classOf[String]),
+ seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+ payload)
+ })
+ .futureValue
+ .get
+ }
+
+ private def selectSnapshotRow(persistenceId: String): TestRow = {
+ implicit val codec: PayloadCodec = snapshotSettings.snapshotPayloadCodec
+
+ r2dbcExecutor
+ .selectOne[TestRow]("test")(
+ connection =>
+ connection.createStatement(
+ s"select * from ${snapshotSettings.snapshotsTableWithSchema} where
persistence_id = '$persistenceId'"),
+ row => {
+ val payload = row.getPayload("snapshot")
+ TestRow(
+ pid = row.get("persistence_id", classOf[String]),
+ seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+ payload)
+ })
+ .futureValue
+ .get
+ }
+
+ private def selectDurableStateRow(persistenceId: String): TestRow = {
+ implicit val codec: PayloadCodec = stateSettings.durableStatePayloadCodec
+
+ r2dbcExecutor
+ .selectOne[TestRow]("test")(
+ connection =>
+ connection.createStatement(
+ s"select * from ${stateSettings.durableStateTableWithSchema} where
persistence_id = '$persistenceId'"),
+ row => {
+ val payload = row.getPayload("state_payload")
+ TestRow(
+ pid = row.get("persistence_id", classOf[String]),
+ seqNr = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
+ payload)
+ })
+ .futureValue
+ .get
+ }
+
+ "journal" should {
+ "store json string payload" in {
+ val persistenceId = nextPid(nextEntityType())
+ val msg = """{"a": "b"}"""
+ testJournalPersister(persistenceId, msg)
+
+ val row = selectJournalRow(persistenceId)
+ new String(row.payload, UTF_8) shouldBe msg
+ }
+
+ "store serialized json payload" in {
+ val persistenceId = nextPid(nextEntityType())
+ val msg = TestJson("b", 17)
+ testJournalPersister(persistenceId, msg)
+
+ val row = selectJournalRow(persistenceId)
+ new String(row.payload, UTF_8).clearWhitespace shouldBe
"""{"a":"b","i":17}""".clearWhitespace
+ }
+ }
+
+ "snapshot store" should {
+ "store json string payload" in {
+ val persistenceId = nextPid(nextEntityType())
+ val msg = """{"a": "snap"}"""
+ testJournalPersister(persistenceId, msg)
+
+ val row = selectSnapshotRow(persistenceId)
+ new String(row.payload, UTF_8).clearWhitespace shouldBe
msg.clearWhitespace
+ }
+ }
+
+ "durable state" should {
+ "store json string payload" in {
+ val persistenceId = nextPid(nextEntityType())
+ val msg = """{"a": "b"}"""
+ testDurableStatePersister(persistenceId, msg)
+
+ val row = selectDurableStateRow(persistenceId)
+ new String(row.payload, UTF_8).clearWhitespace shouldBe
msg.clearWhitespace
+ }
+
+ "store serialized json payload" in {
+ val persistenceId = nextPid(nextEntityType())
+ val msg = TestJson("b", 17)
+ testDurableStatePersister(persistenceId, msg)
+
+ val row = selectDurableStateRow(persistenceId)
+ new String(row.payload, UTF_8).clearWhitespace shouldBe
"""{"a":"b","i":17}""".clearWhitespace
+ }
+
+ "store delete marker" in {
+ val persistenceId = nextPid(nextEntityType())
+ val probe = createTestProbe[Any]()
+
+ val msg = """{"a": "to be deleted"}"""
+
+ // persist first so we have a known row at revision 1
+ val ref1 = spawn(DurableStatePersister(persistenceId))
+ ref1 ! DurableStatePersister.PersistWithAck(msg, probe.ref)
+ probe.expectMessage(Done)
+
+ val row1 = selectDurableStateRow(persistenceId)
+ new String(row1.payload, UTF_8).clearWhitespace shouldBe
msg.clearWhitespace
+
+ // delete after change: updates the row to a delete marker (revision 2)
+ ref1 ! DurableStatePersister.DeleteWithAck(probe.ref)
+ probe.expectMessage(Done)
+ testKit.stop(ref1)
+
+ val row2 = selectDurableStateRow(persistenceId)
+ row2.payload.toVector shouldBe
stateSettings.durableStatePayloadCodec.nonePayload.toVector
+
+ val ref2 = spawn(DurableStatePersister(persistenceId))
+ ref2 ! DurableStatePersister.GetState(probe.ref)
+ probe.expectMessage("") // after delete
+
+ // persist new state after the delete
+ val msg2 = """{"b": "new state"}"""
+ ref2 ! DurableStatePersister.PersistWithAck(msg2, probe.ref)
+ probe.expectMessage(Done)
+ testKit.stop(ref2)
+
+ val row3 = selectDurableStateRow(persistenceId)
+ new String(row3.payload, UTF_8).clearWhitespace shouldBe
msg2.clearWhitespace
+
+ // delete again
+ val ref3 = spawn(DurableStatePersister(persistenceId))
+ ref3 ! DurableStatePersister.DeleteWithAck(probe.ref)
+ probe.expectMessage(Done)
+ testKit.stop(ref3)
+
+ val row4 = selectDurableStateRow(persistenceId)
+ row4.payload.toVector shouldBe
stateSettings.durableStatePayloadCodec.nonePayload.toVector
+ }
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
index bf336a0..f555d9a 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
@@ -119,6 +119,7 @@ object TestActors {
final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done])
extends Command
final case class DeleteWithAck(replyTo: ActorRef[Done]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
+ final case class GetState(replyTo: ActorRef[Any]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command
def apply(pid: String): Behavior[Command] =
@@ -129,7 +130,7 @@ object TestActors {
DurableStateBehavior[Command, Any](
persistenceId = pid,
"",
- { (_, command) =>
+ { (state, command) =>
command match {
case command: Persist =>
context.log.debug(
@@ -149,6 +150,9 @@ object TestActors {
context.log
.debug("Delete pid [{}], seqNr [{}]", pid.id,
DurableStateBehavior.lastSequenceNumber(context) + 1)
Effect.delete[Any]().thenRun(_ => command.replyTo ! Done)
+ case GetState(replyTo) =>
+ replyTo ! state
+ Effect.none
case Ping(replyTo) =>
replyTo ! Done
Effect.none
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
index 08789e5..f066678 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
@@ -70,6 +70,7 @@ object TestConfig {
pekko.actor {
serialization-bindings {
"org.apache.pekko.persistence.r2dbc.CborSerializable" = jackson-cbor
+ "org.apache.pekko.persistence.r2dbc.JsonSerializable" = jackson-json
}
}
pekko.actor.testkit.typed.default-timeout = 10s
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
index 5b29f6d..2b836c7 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
@@ -21,6 +21,8 @@ import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.TestActors.Persister
import pekko.persistence.r2dbc.TestConfig
@@ -40,6 +42,7 @@ class PersistTimestampSpec
override def typedSystem: ActorSystem[_] = system
private val settings =
JournalSettings(system.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
private val serialization = SerializationExtension(system)
+ private implicit val journalPayloadCodec: PayloadCodec =
settings.journalPayloadCodec
case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)
@@ -79,7 +82,7 @@ class PersistTimestampSpec
row => {
val event = serialization
.deserialize(
- row.get("event_payload", classOf[Array[Byte]]),
+ row.getPayload("event_payload"),
row.get[Integer]("event_ser_id", classOf[Integer]),
row.get("event_ser_manifest", classOf[String]))
.get
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index 061b6f9..e3e12fe 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -26,6 +26,8 @@ import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.QuerySettings
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
@@ -58,6 +60,7 @@ class EventsBySliceBacktrackingSpec
override def typedSystem: ActorSystem[_] = system
private val settings =
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
+ private implicit val journalPayloadCodec: PayloadCodec =
settings.journalPayloadCodec
private val query = PersistenceQuery(testKit.system)
.readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
@@ -83,7 +86,7 @@ class EventsBySliceBacktrackingSpec
.bind(3, seqNr)
.bind(4, timestamp)
.bind(5, stringSerializer.identifier)
- .bind(6, stringSerializer.toBinary(event))
+ .bindPayload(6, stringSerializer.toBinary(event))
}
result.futureValue shouldBe 1
}
diff --git a/ddl-scripts/create_tables_postgres_jsonb.sql
b/ddl-scripts/create_tables_postgres_jsonb.sql
new file mode 100644
index 0000000..6c2e36a
--- /dev/null
+++ b/ddl-scripts/create_tables_postgres_jsonb.sql
@@ -0,0 +1,97 @@
+-- # SPDX-License-Identifier: Apache-2.0
+
+CREATE TABLE IF NOT EXISTS event_journal(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ db_timestamp timestamp with time zone NOT NULL,
+
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest VARCHAR(255) NOT NULL,
+ event_payload JSONB NOT NULL,
+
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ writer VARCHAR(255) NOT NULL,
+ adapter_manifest VARCHAR(255),
+ tags TEXT ARRAY,
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice,
entity_type, db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ write_timestamp BIGINT NOT NULL,
+ ser_id INTEGER NOT NULL,
+ ser_manifest VARCHAR(255) NOT NULL,
+ snapshot JSONB NOT NULL,
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ db_timestamp timestamp with time zone NOT NULL,
+
+ state_ser_id INTEGER NOT NULL,
+ state_ser_manifest VARCHAR(255),
+ state_payload JSONB NOT NULL,
+ tags TEXT ARRAY,
+
+ PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice,
entity_type, db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table
is not created.
+CREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ current_offset VARCHAR(255) NOT NULL,
+ manifest VARCHAR(32) NOT NULL,
+ mergeable BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS pekko_projection_timestamp_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ slice INT NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ -- timestamp_offset is the db_timestamp of the original event
+ timestamp_offset timestamp with time zone NOT NULL,
+ -- timestamp_consumed is when the offset was stored
+ -- the consumer lag is timestamp_consumed - timestamp_offset
+ timestamp_consumed timestamp with time zone NOT NULL,
+ PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS pekko_projection_management (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ paused BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index aeddf55..ecf6730 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -30,7 +30,8 @@ import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
@@ -145,6 +146,7 @@ class EventSourcedEndToEndSpec
private val querySettings =
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
private val projectionSettings = R2dbcProjectionSettings(system)
private val stringSerializer =
SerializationExtension(system).serializerFor(classOf[String])
+ private implicit val journalPayloadCodec: PayloadCodec =
querySettings.journalPayloadCodec
override protected def beforeAll(): Unit = {
super.beforeAll()
@@ -171,7 +173,7 @@ class EventSourcedEndToEndSpec
.bind(3, seqNr)
.bind(4, timestamp)
.bind(5, stringSerializer.identifier)
- .bind(6, stringSerializer.toBinary(event))
+ .bindPayload(6, stringSerializer.toBinary(event))
}
result.futureValue shouldBe 1
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]