This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new a593069  Port akka-persistence-r2dbc PR#239: add JSONB/JSON payload 
column type support (#355)
a593069 is described below

commit a593069f371a7fe3c40a1815476021fc96b472d2
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 18:11:41 2026 +0100

    Port akka-persistence-r2dbc PR#239: add JSONB/JSON payload column type 
support (#355)
    
    * Port akka-persistence-r2dbc PR#239: add JSONB/JSON payload column type 
support
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/2b268242-2841-4d1d-8983-0a15eca8890a
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Port akka-persistence-r2dbc PR#388: nonePayload, bindPayloadOption, 
PayloadSpec test
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/6ce25a3f-8305-4ade-bbda-e17e51fe134d
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix PayloadSpec: remove invalid row1 assertion for delete-before-persist 
case
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/66a2b6dc-2582-4911-8428-ef2187f45089
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix PayloadSpec: restructure store delete marker test to persist-first to 
avoid Pekko 1.3.0 intermittent failure
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/6db8d3b2-eac7-413b-9f05-3e4d313fa6f9
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update build-test.yml
    
    * Update build-test.yml
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .github/workflows/build-test.yml                   |  47 ++++
 core/src/main/resources/reference.conf             |  11 +
 .../pekko/persistence/r2dbc/R2dbcSettings.scala    |  22 ++
 .../r2dbc/internal/EventsByPersistenceIdDao.scala  |   6 +-
 .../persistence/r2dbc/internal/PayloadCodec.scala  |  63 ++++++
 .../persistence/r2dbc/journal/JournalDao.scala     |   7 +-
 .../r2dbc/query/scaladsl/QueryDao.scala            |   7 +-
 .../persistence/r2dbc/snapshot/SnapshotDao.scala   |  42 ++--
 .../r2dbc/state/scaladsl/DurableStateDao.scala     |  17 +-
 .../pekko/persistence/r2dbc/JsonSerializable.scala |  16 ++
 .../pekko/persistence/r2dbc/PayloadSpec.scala      | 252 +++++++++++++++++++++
 .../pekko/persistence/r2dbc/TestActors.scala       |   6 +-
 .../pekko/persistence/r2dbc/TestConfig.scala       |   1 +
 .../r2dbc/journal/PersistTimestampSpec.scala       |   5 +-
 .../query/EventsBySliceBacktrackingSpec.scala      |   5 +-
 ddl-scripts/create_tables_postgres_jsonb.sql       |  97 ++++++++
 .../r2dbc/EventSourcedEndToEndSpec.scala           |   6 +-
 17 files changed, 575 insertions(+), 35 deletions(-)

diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index d00af47..4914c36 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -90,6 +90,53 @@ jobs:
       - name: test
         run: sbt ++${{ matrix.SCALA_VERSION }} test
 
+  test-postgres-jsonb:
+    name: Run test with Postgres (JSONB)
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        SCALA_VERSION: [ 2.13, 3.3 ]
+        JAVA_VERSION: [ 17 ]
+    if: github.repository == 'apache/pekko-persistence-r2dbc'
+    steps:
+      - name: Checkout
+        uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # 
v6.0.2
+        with:
+          fetch-depth: 0
+          fetch-tags: true
+
+      - name: Checkout GitHub merge
+        if: github.event.pull_request
+        run: |-
+          git fetch origin pull/${{ github.event.pull_request.number 
}}/merge:scratch
+          git checkout scratch
+
+      - name: Setup Java ${{ matrix.JAVA_VERSION }}
+        uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # 
v5.2.0
+        with:
+          distribution: temurin
+          java-version: ${{ matrix.JAVA_VERSION }}
+
+      - name: Install sbt
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
6.4.8.1.0
+
+      - name: Enable jvm-opts
+        run: cp .jvmopts-ci .jvmopts
+
+      - name: Start DB
+        run: |-
+          docker compose -f docker/docker-compose-postgres.yml up -d
+          # TODO: could we poll the port instead of sleep?
+          sleep 10
+          docker exec -i docker-postgres-db-1 psql -U postgres -t < 
ddl-scripts/create_tables_postgres_jsonb.sql
+
+      - name: test
+        run: |-
+          sbt -Dpekko.persistence.r2dbc.journal.payload-column-type=JSONB 
-Dpekko.persistence.r2dbc.snapshot.payload-column-type=JSONB 
-Dpekko.persistence.r2dbc.state.payload-column-type=JSONB "core/testOnly 
*persistence.r2dbc.PayloadSpec"
+
   test-yugabyte:
     name: Run tests with Yugabyte
     runs-on: ubuntu-latest
diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index dac6e01..ecd8a7f 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -8,6 +8,9 @@ pekko.persistence.r2dbc {
     # name of the table to use for events
     table = "event_journal"
 
+    # the column type to use for event payloads (BYTEA, JSON or JSONB)
+    payload-column-type = "BYTEA"
+
     # Otherwise it would be a pinned dispatcher, see 
https://github.com/akka/akka/issues/31058
     plugin-dispatcher = "pekko.actor.default-dispatcher"
 
@@ -59,6 +62,9 @@ pekko.persistence.r2dbc {
     class = "org.apache.pekko.persistence.r2dbc.snapshot.R2dbcSnapshotStore"
     table = "snapshot"
 
+    # the column type to use for snapshot payloads (BYTEA, JSON or JSONB)
+    payload-column-type = "BYTEA"
+
     # Otherwise it would be a pinned dispatcher, see 
https://github.com/akka/akka/issues/31058
     plugin-dispatcher = "pekko.actor.default-dispatcher"
 
@@ -81,6 +87,9 @@ pekko.persistence.r2dbc {
 
     table = "durable_state"
 
+    # the column type to use for durable state payloads (BYTEA, JSON or JSONB)
+    payload-column-type = "BYTEA"
+
     # When this is enabled the updates verifies that the revision is +1 of
     # previous revision. There might be a small performance gain if
     # this is disabled.
@@ -121,6 +130,8 @@ pekko.persistence.r2dbc {
 
     table = ${pekko.persistence.r2dbc.journal.table}
 
+    payload-column-type = 
${pekko.persistence.r2dbc.journal.payload-column-type}
+
     publish-events =  ${pekko.persistence.r2dbc.journal.publish-events}
 
     # When journal publish-events is enabled a best effort deduplication can 
be enabled by setting
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index ff1a234..93402a6 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -21,6 +21,7 @@ import scala.jdk.DurationConverters._
 import org.apache.pekko
 import pekko.annotation.InternalApi
 import pekko.annotation.InternalStableApi
+import pekko.persistence.r2dbc.internal.PayloadCodec
 import pekko.util.Helpers.toRootLowerCase
 import com.typesafe.config.Config
 
@@ -63,6 +64,9 @@ final class JournalSettings(val config: Config) extends 
ConnectionSettings with
 
   val journalTable: String = config.getString("table")
   val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
journalTable
+
+  val journalPayloadCodec: PayloadCodec =
+    if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else 
PayloadCodec.ByteArrayCodec
 }
 
 /**
@@ -82,6 +86,9 @@ final class SnapshotSettings(val config: Config) extends 
ConnectionSettings with
 
   val snapshotsTable: String = config.getString("table")
   val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
snapshotsTable
+
+  val snapshotPayloadCodec: PayloadCodec =
+    if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else 
PayloadCodec.ByteArrayCodec
 }
 
 /**
@@ -105,6 +112,9 @@ final class StateSettings(val config: Config) extends 
ConnectionSettings with Us
   val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") 
+ durableStateTable
 
   val durableStateAssertSingleWriter: Boolean = 
config.getBoolean("assert-single-writer")
+
+  val durableStatePayloadCodec: PayloadCodec =
+    if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else 
PayloadCodec.ByteArrayCodec
 }
 
 /**
@@ -127,6 +137,9 @@ final class QuerySettings(val config: Config) extends 
ConnectionSettings with Us
   val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
journalTable
 
   val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
+
+  val journalPayloadCodec: PayloadCodec =
+    if (useJsonPayload("payload-column-type")) PayloadCodec.JsonCodec else 
PayloadCodec.ByteArrayCodec
 }
 
 /**
@@ -154,6 +167,15 @@ trait ConnectionSettings {
       case "off" => -1.millis
       case _     => config.getDuration("log-db-calls-exceeding").toScala
     }
+
+  protected def useJsonPayload(configKey: String): Boolean =
+    config.getString(configKey).toUpperCase match {
+      case "BYTEA"          => false
+      case "JSONB" | "JSON" => true
+      case t                =>
+        throw new IllegalStateException(
+          s"Expected $configKey to be one of 'BYTEA', 'JSON' or 'JSONB' but 
found '$t'")
+    }
 }
 
 /**
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
index 3664e92..3862ce0 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -20,6 +20,8 @@ import pekko.NotUsed
 import pekko.annotation.InternalApi
 import pekko.persistence.r2dbc.BufferSize
 import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
 import pekko.persistence.r2dbc.journal.JournalDao.readMetadata
@@ -61,6 +63,8 @@ private[r2dbc] trait EventsByPersistenceIdDao {
 
   protected def settings: BufferSize
 
+  implicit protected def journalPayloadCodec: PayloadCodec
+
   private lazy val selectEventsSql = sql"""
     SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload, tags
     from $journalTable
@@ -141,7 +145,7 @@ private[r2dbc] trait EventsByPersistenceIdDao {
           seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
           dbTimestamp = row.get("db_timestamp", classOf[Instant]),
           readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
-          payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+          payload = Some(row.getPayload("event_payload")),
           serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = row.get("writer", classOf[String]),
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PayloadCodec.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PayloadCodec.scala
new file mode 100644
index 0000000..f15f441
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PayloadCodec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.pekko.annotation.InternalApi
+import io.r2dbc.postgresql.codec.Json
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] sealed trait PayloadCodec {
+  def payloadClass: Class[_]
+  def encode(bytes: Array[Byte]): Any
+  def decode(payload: Any): Array[Byte]
+  def nonePayload: Array[Byte]
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object PayloadCodec {
+  case object ByteArrayCodec extends PayloadCodec {
+    override def payloadClass: Class[Array[Byte]] = classOf[Array[Byte]]
+    override def encode(bytes: Array[Byte]): Array[Byte] = bytes
+    override def decode(payload: Any): Array[Byte] = 
payload.asInstanceOf[Array[Byte]]
+    override def nonePayload: Array[Byte] = Array.emptyByteArray
+  }
+  case object JsonCodec extends PayloadCodec {
+    override def payloadClass: Class[Json] = classOf[Json]
+    override def encode(bytes: Array[Byte]): Json = Json.of(bytes)
+    override def decode(payload: Any): Array[Byte] =
+      if (payload == null) null else payload.asInstanceOf[Json].asArray()
+    override val nonePayload: Array[Byte] = "{}".getBytes(UTF_8)
+  }
+  implicit class RichStatement(val statement: Statement)(implicit codec: 
PayloadCodec) extends AnyRef {
+    def bindPayload(index: Int, payload: Array[Byte]): Statement =
+      statement.bind(index, codec.encode(payload))
+
+    def bindPayloadOption(index: Int, payloadOption: Option[Array[Byte]]): 
Statement =
+      payloadOption match {
+        case Some(payload) => bindPayload(index, payload)
+        case None          => bindPayload(index, codec.nonePayload)
+      }
+  }
+  implicit class RichRow(val row: Row)(implicit codec: PayloadCodec) extends 
AnyRef {
+    def getPayload(name: String): Array[Byte] = codec.decode(row.get(name, 
codec.payloadClass))
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index c61d154..5be1579 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -27,6 +27,8 @@ import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
 import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
@@ -112,6 +114,7 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
   protected val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
   protected val journalTable: String = settings.journalTableWithSchema
+  protected implicit val journalPayloadCodec: PayloadCodec = 
settings.journalPayloadCodec
 
   protected val (insertEventWithParameterTimestampSql: String, 
insertEventWithTransactionTimestampSql: String) = {
     val baseSql =
@@ -189,7 +192,7 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
         .bind(5, "") // FIXME event adapter
         .bind(6, write.serId)
         .bind(7, write.serManifest)
-        .bind(8, write.payload.get)
+        .bindPayload(8, write.payload.get)
 
       if (write.tags.isEmpty)
         stmt.bindNull(9, classOf[Array[String]])
@@ -290,7 +293,7 @@ private[r2dbc] class JournalDao(val settings: 
JournalSettings, connectionFactory
           .bind(5, "")
           .bind(6, 0)
           .bind(7, "")
-          .bind(8, Array.emptyByteArray)
+          .bindPayloadOption(8, None)
           .bind(9, true)
       }
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index b79f7ac..c23bcff 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -32,6 +32,8 @@ import 
pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
 import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
 import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
 import pekko.persistence.r2dbc.internal.InstantFactory
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
@@ -82,6 +84,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
   protected lazy val statementTimestampSql: String = "statement_timestamp()"
 
   protected val journalTable = settings.journalTableWithSchema
+  protected implicit val journalPayloadCodec: PayloadCodec = 
settings.journalPayloadCodec
 
   private val currentDbTimestampSql =
     "SELECT transaction_timestamp() AS db_timestamp"
@@ -224,7 +227,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
             seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
-            payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+            payload = Some(row.getPayload("event_payload")),
             serId = row.get[Integer]("event_ser_id", classOf[Integer]),
             serManifest = row.get("event_ser_manifest", classOf[String]),
             writerUuid = "", // not need in this query
@@ -310,7 +313,7 @@ private[r2dbc] class QueryDao(val settings: QuerySettings, 
connectionFactory: Co
           seqNr,
           dbTimestamp = row.get("db_timestamp", classOf[Instant]),
           readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
-          payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+          payload = Some(row.getPayload("event_payload")),
           serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = "", // not need in this query
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 7cf77dc..de0cdf3 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -23,6 +23,9 @@ import pekko.persistence.SnapshotSelectionCriteria
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.SnapshotSettings
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
@@ -50,24 +53,6 @@ private[r2dbc] object SnapshotDao {
 
   final case class SerializedSnapshotMetadata(payload: Array[Byte], 
serializerId: Int, serializerManifest: String)
 
-  private def collectSerializedSnapshot(row: Row): SerializedSnapshotRow =
-    SerializedSnapshotRow(
-      row.get("persistence_id", classOf[String]),
-      row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
-      row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]),
-      row.get("snapshot", classOf[Array[Byte]]),
-      row.get[Integer]("ser_id", classOf[Integer]),
-      row.get("ser_manifest", classOf[String]), {
-        val metaSerializerId = row.get[Integer]("meta_ser_id", 
classOf[Integer])
-        if (metaSerializerId eq null) None
-        else
-          Some(
-            SerializedSnapshotMetadata(
-              row.get("meta_payload", classOf[Array[Byte]]),
-              metaSerializerId,
-              row.get("meta_ser_manifest", classOf[String])))
-      })
-
   def fromConfig(
       settings: SnapshotSettings,
       config: Config
@@ -100,6 +85,25 @@ private[r2dbc] class SnapshotDao(settings: 
SnapshotSettings, connectionFactory:
   protected val snapshotTable: String = settings.snapshotsTableWithSchema
   private val persistenceExt = Persistence(system)
   private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
+  protected implicit val snapshotPayloadCodec: PayloadCodec = 
settings.snapshotPayloadCodec
+
+  private def collectSerializedSnapshot(row: Row): SerializedSnapshotRow =
+    SerializedSnapshotRow(
+      row.get("persistence_id", classOf[String]),
+      row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+      row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]),
+      row.getPayload("snapshot"),
+      row.get[Integer]("ser_id", classOf[Integer]),
+      row.get("ser_manifest", classOf[String]), {
+        val metaSerializerId = row.get[Integer]("meta_ser_id", 
classOf[Integer])
+        if (metaSerializerId eq null) None
+        else
+          Some(
+            SerializedSnapshotMetadata(
+              row.get("meta_payload", classOf[Array[Byte]]),
+              metaSerializerId,
+              row.get("meta_ser_manifest", classOf[String])))
+      })
 
   protected val upsertSql = sql"""
     INSERT INTO $snapshotTable
@@ -211,7 +215,7 @@ private[r2dbc] class SnapshotDao(settings: 
SnapshotSettings, connectionFactory:
               .bind(2, serializedRow.persistenceId)
               .bind(3, serializedRow.seqNr)
               .bind(4, serializedRow.writeTimestamp)
-              .bind(5, serializedRow.snapshot)
+              .bindPayload(5, serializedRow.snapshot)
               .bind(6, serializedRow.serializerId)
               .bind(7, serializedRow.serializerManifest)
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 2b988de..8b45992 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -14,6 +14,7 @@
 package org.apache.pekko.persistence.r2dbc.state.scaladsl
 
 import java.time.Instant
+import java.util
 
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration.{ Duration, FiniteDuration }
@@ -31,6 +32,9 @@ import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
 import pekko.persistence.r2dbc.internal.InstantFactory
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
@@ -99,6 +103,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
   private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
   protected val stateTable = settings.durableStateTableWithSchema
+  protected implicit val statePayloadCodec: PayloadCodec = 
settings.durableStatePayloadCodec
 
   private val selectStateSql: String = sql"""
     SELECT revision, state_ser_id, state_ser_manifest, state_payload, 
db_timestamp
@@ -222,8 +227,8 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
 
   private def getPayload(row: Row): Option[Array[Byte]] = {
     val serId = row.get("state_ser_id", classOf[Integer])
-    val rowPayload = row.get("state_payload", classOf[Array[Byte]])
-    if (serId == 0 && (rowPayload == null || rowPayload.isEmpty))
+    val rowPayload = row.getPayload("state_payload")
+    if (serId == 0 && (rowPayload == null || 
util.Arrays.equals(statePayloadCodec.nonePayload, rowPayload)))
       None // delete marker
     else
       Option(rowPayload)
@@ -254,7 +259,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
               .bind(3, state.revision)
               .bind(4, state.serId)
               .bind(5, state.serManifest)
-              .bind(6, state.payload.getOrElse(Array.emptyByteArray))
+              .bindPayloadOption(6, state.payload)
             bindTags(stmt, 7)
           }
           .recoverWith { case _: R2dbcDataIntegrityViolationException =>
@@ -271,7 +276,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
             .bind(0, state.revision)
             .bind(1, state.serId)
             .bind(2, state.serManifest)
-            .bind(3, state.payload.getOrElse(Array.emptyByteArray))
+            .bindPayloadOption(3, state.payload)
           bindTags(stmt, 4)
 
           if (settings.dbTimestampMonotonicIncreasing) {
@@ -347,7 +352,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
                 .bind(3, revision)
                 .bind(4, 0)
                 .bind(5, "")
-                .bind(6, Array.emptyByteArray)
+                .bindPayloadOption(6, None)
                 .bindNull(7, classOf[Array[String]])
             }
             .recoverWith { case _: R2dbcDataIntegrityViolationException =>
@@ -363,7 +368,7 @@ private[r2dbc] class DurableStateDao(settings: 
StateSettings, connectionFactory:
               .bind(0, revision)
               .bind(1, 0)
               .bind(2, "")
-              .bind(3, Array.emptyByteArray)
+              .bindPayloadOption(3, None)
 
             if (settings.dbTimestampMonotonicIncreasing) {
               if (settings.durableStateAssertSingleWriter)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/JsonSerializable.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/JsonSerializable.scala
new file mode 100644
index 0000000..77d7890
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/JsonSerializable.scala
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc
+
+trait JsonSerializable
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/PayloadSpec.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/PayloadSpec.scala
new file mode 100644
index 0000000..1078856
--- /dev/null
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/PayloadSpec.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc
+
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.TestActors.DurableStatePersister
+import pekko.persistence.r2dbc.TestActors.Persister
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+/**
+ * The purpose of this test is to verify JSONB payloads, but it can also be 
run with ordinary BYTEA payloads. To test
+ * with JSONB the db schema should be created with 
`ddl-scripts/create_tables_postgres_jsonb.sql` and start `sbt` with
+ *
+ * {{{
+ * sbt -Dpekko.persistence.r2dbc.journal.payload-column-type=JSONB 
-Dpekko.persistence.r2dbc.snapshot.payload-column-type=JSONB 
-Dpekko.persistence.r2dbc.state.payload-column-type=JSONB
+ * }}}
+ *
+ * Note that other tests may fail with JSONB column type because the test data 
isn't in json.
+ */
+object PayloadSpec {
+  val config = ConfigFactory
+    .parseString("""
+    pekko.serialization.jackson.jackson-json.compression.algorithm = off
+    """)
+    .withFallback(TestConfig.config)
+
+  final case class TestRow(pid: String, seqNr: Long, payload: Array[Byte])
+
+  final case class TestJson(a: String, i: Int) extends JsonSerializable
+
+  implicit class JsonString(val s: String) extends AnyVal {
+    // don't care about json formatting, which may be different for JSONB and 
BYTEA
+    def clearWhitespace: String = {
+      val sb = new StringBuilder(s.length)
+      s.foreach { ch =>
+        if (!ch.isWhitespace)
+          sb.append(ch)
+      }
+      sb.result()
+    }
+  }
+}
+
+class PayloadSpec
+    extends ScalaTestWithActorTestKit(PayloadSpec.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with LogCapturing {
+  import PayloadSpec._
+
+  override def typedSystem: ActorSystem[_] = system
+
+  private def testJournalPersister(persistenceId: String, msg: Any): Unit = {
+    val probe = createTestProbe[Any]()
+    val ref1 = spawn(Persister(persistenceId))
+    ref1 ! Persister.PersistWithAck(msg, probe.ref)
+    probe.expectMessage(Done)
+    testKit.stop(ref1)
+
+    val ref2 = spawn(Persister(persistenceId))
+    ref2 ! Persister.GetState(probe.ref)
+    probe.receiveMessage().toString.clearWhitespace shouldBe 
msg.toString.clearWhitespace
+    testKit.stop(ref2)
+  }
+
+  private def testDurableStatePersister(persistenceId: String, msg: Any): Unit 
= {
+    val probe = createTestProbe[Any]()
+    val ref1 = spawn(DurableStatePersister(persistenceId))
+    ref1 ! DurableStatePersister.PersistWithAck(msg, probe.ref)
+    probe.expectMessage(Done)
+    testKit.stop(ref1)
+
+    val ref2 = spawn(DurableStatePersister(persistenceId))
+    ref2 ! DurableStatePersister.GetState(probe.ref)
+    probe.receiveMessage().toString.clearWhitespace shouldBe 
msg.toString.clearWhitespace
+    testKit.stop(ref2)
+  }
+
+  private def selectJournalRow(persistenceId: String): TestRow = {
+    implicit val codec: PayloadCodec = journalSettings.journalPayloadCodec
+
+    r2dbcExecutor
+      .selectOne[TestRow]("test")(
+        connection =>
+          connection.createStatement(
+            s"select * from ${journalSettings.journalTableWithSchema} where 
persistence_id = '$persistenceId'"),
+        row => {
+          val payload = row.getPayload("event_payload")
+          TestRow(
+            pid = row.get("persistence_id", classOf[String]),
+            seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+            payload)
+        })
+      .futureValue
+      .get
+  }
+
+  private def selectSnapshotRow(persistenceId: String): TestRow = {
+    implicit val codec: PayloadCodec = snapshotSettings.snapshotPayloadCodec
+
+    r2dbcExecutor
+      .selectOne[TestRow]("test")(
+        connection =>
+          connection.createStatement(
+            s"select * from ${snapshotSettings.snapshotsTableWithSchema} where 
persistence_id = '$persistenceId'"),
+        row => {
+          val payload = row.getPayload("snapshot")
+          TestRow(
+            pid = row.get("persistence_id", classOf[String]),
+            seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+            payload)
+        })
+      .futureValue
+      .get
+  }
+
+  private def selectDurableStateRow(persistenceId: String): TestRow = {
+    implicit val codec: PayloadCodec = stateSettings.durableStatePayloadCodec
+
+    r2dbcExecutor
+      .selectOne[TestRow]("test")(
+        connection =>
+          connection.createStatement(
+            s"select * from ${stateSettings.durableStateTableWithSchema} where 
persistence_id = '$persistenceId'"),
+        row => {
+          val payload = row.getPayload("state_payload")
+          TestRow(
+            pid = row.get("persistence_id", classOf[String]),
+            seqNr = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
+            payload)
+        })
+      .futureValue
+      .get
+  }
+
+  "journal" should {
+    "store json string payload" in {
+      val persistenceId = nextPid(nextEntityType())
+      val msg = """{"a": "b"}"""
+      testJournalPersister(persistenceId, msg)
+
+      val row = selectJournalRow(persistenceId)
+      new String(row.payload, UTF_8) shouldBe msg
+    }
+
+    "store serialized json payload" in {
+      val persistenceId = nextPid(nextEntityType())
+      val msg = TestJson("b", 17)
+      testJournalPersister(persistenceId, msg)
+
+      val row = selectJournalRow(persistenceId)
+      new String(row.payload, UTF_8).clearWhitespace shouldBe 
"""{"a":"b","i":17}""".clearWhitespace
+    }
+  }
+
+  "snapshot store" should {
+    "store json string payload" in {
+      val persistenceId = nextPid(nextEntityType())
+      val msg = """{"a": "snap"}"""
+      testJournalPersister(persistenceId, msg)
+
+      val row = selectSnapshotRow(persistenceId)
+      new String(row.payload, UTF_8).clearWhitespace shouldBe 
msg.clearWhitespace
+    }
+  }
+
+  "durable state" should {
+    "store json string payload" in {
+      val persistenceId = nextPid(nextEntityType())
+      val msg = """{"a": "b"}"""
+      testDurableStatePersister(persistenceId, msg)
+
+      val row = selectDurableStateRow(persistenceId)
+      new String(row.payload, UTF_8).clearWhitespace shouldBe 
msg.clearWhitespace
+    }
+
+    "store serialized json payload" in {
+      val persistenceId = nextPid(nextEntityType())
+      val msg = TestJson("b", 17)
+      testDurableStatePersister(persistenceId, msg)
+
+      val row = selectDurableStateRow(persistenceId)
+      new String(row.payload, UTF_8).clearWhitespace shouldBe 
"""{"a":"b","i":17}""".clearWhitespace
+    }
+
+    "store delete marker" in {
+      val persistenceId = nextPid(nextEntityType())
+      val probe = createTestProbe[Any]()
+
+      val msg = """{"a": "to be deleted"}"""
+
+      // persist first so we have a known row at revision 1
+      val ref1 = spawn(DurableStatePersister(persistenceId))
+      ref1 ! DurableStatePersister.PersistWithAck(msg, probe.ref)
+      probe.expectMessage(Done)
+
+      val row1 = selectDurableStateRow(persistenceId)
+      new String(row1.payload, UTF_8).clearWhitespace shouldBe 
msg.clearWhitespace
+
+      // delete after change: updates the row to a delete marker (revision 2)
+      ref1 ! DurableStatePersister.DeleteWithAck(probe.ref)
+      probe.expectMessage(Done)
+      testKit.stop(ref1)
+
+      val row2 = selectDurableStateRow(persistenceId)
+      row2.payload.toVector shouldBe 
stateSettings.durableStatePayloadCodec.nonePayload.toVector
+
+      val ref2 = spawn(DurableStatePersister(persistenceId))
+      ref2 ! DurableStatePersister.GetState(probe.ref)
+      probe.expectMessage("") // after delete
+
+      // persist new state after the delete
+      val msg2 = """{"b": "new state"}"""
+      ref2 ! DurableStatePersister.PersistWithAck(msg2, probe.ref)
+      probe.expectMessage(Done)
+      testKit.stop(ref2)
+
+      val row3 = selectDurableStateRow(persistenceId)
+      new String(row3.payload, UTF_8).clearWhitespace shouldBe 
msg2.clearWhitespace
+
+      // delete again
+      val ref3 = spawn(DurableStatePersister(persistenceId))
+      ref3 ! DurableStatePersister.DeleteWithAck(probe.ref)
+      probe.expectMessage(Done)
+      testKit.stop(ref3)
+
+      val row4 = selectDurableStateRow(persistenceId)
+      row4.payload.toVector shouldBe 
stateSettings.durableStatePayloadCodec.nonePayload.toVector
+    }
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
index bf336a0..f555d9a 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestActors.scala
@@ -119,6 +119,7 @@ object TestActors {
     final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done]) 
extends Command
     final case class DeleteWithAck(replyTo: ActorRef[Done]) extends Command
     final case class Ping(replyTo: ActorRef[Done]) extends Command
+    final case class GetState(replyTo: ActorRef[Any]) extends Command
     final case class Stop(replyTo: ActorRef[Done]) extends Command
 
     def apply(pid: String): Behavior[Command] =
@@ -129,7 +130,7 @@ object TestActors {
         DurableStateBehavior[Command, Any](
           persistenceId = pid,
           "",
-          { (_, command) =>
+          { (state, command) =>
             command match {
               case command: Persist =>
                 context.log.debug(
@@ -149,6 +150,9 @@ object TestActors {
                 context.log
                   .debug("Delete pid [{}], seqNr [{}]", pid.id, 
DurableStateBehavior.lastSequenceNumber(context) + 1)
                 Effect.delete[Any]().thenRun(_ => command.replyTo ! Done)
+              case GetState(replyTo) =>
+                replyTo ! state
+                Effect.none
               case Ping(replyTo) =>
                 replyTo ! Done
                 Effect.none
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
index 08789e5..f066678 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
@@ -70,6 +70,7 @@ object TestConfig {
     pekko.actor {
       serialization-bindings {
         "org.apache.pekko.persistence.r2dbc.CborSerializable" = jackson-cbor
+        "org.apache.pekko.persistence.r2dbc.JsonSerializable" = jackson-json
       }
     }
     pekko.actor.testkit.typed.default-timeout = 10s
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
index 5b29f6d..2b836c7 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
@@ -21,6 +21,8 @@ import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichRow
 import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.TestActors.Persister
 import pekko.persistence.r2dbc.TestConfig
@@ -40,6 +42,7 @@ class PersistTimestampSpec
   override def typedSystem: ActorSystem[_] = system
   private val settings = 
JournalSettings(system.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
   private val serialization = SerializationExtension(system)
+  private implicit val journalPayloadCodec: PayloadCodec = 
settings.journalPayloadCodec
 
   case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)
 
@@ -79,7 +82,7 @@ class PersistTimestampSpec
             row => {
               val event = serialization
                 .deserialize(
-                  row.get("event_payload", classOf[Array[Byte]]),
+                  row.getPayload("event_payload"),
                   row.get[Integer]("event_ser_id", classOf[Integer]),
                   row.get("event_ser_manifest", classOf[String]))
                 .get
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index 061b6f9..e3e12fe 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -26,6 +26,8 @@ import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.Dialect
 import pekko.persistence.r2dbc.QuerySettings
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
@@ -58,6 +60,7 @@ class EventsBySliceBacktrackingSpec
 
   override def typedSystem: ActorSystem[_] = system
   private val settings = 
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
+  private implicit val journalPayloadCodec: PayloadCodec = 
settings.journalPayloadCodec
 
   private val query = PersistenceQuery(testKit.system)
     .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
@@ -83,7 +86,7 @@ class EventsBySliceBacktrackingSpec
         .bind(3, seqNr)
         .bind(4, timestamp)
         .bind(5, stringSerializer.identifier)
-        .bind(6, stringSerializer.toBinary(event))
+        .bindPayload(6, stringSerializer.toBinary(event))
     }
     result.futureValue shouldBe 1
   }
diff --git a/ddl-scripts/create_tables_postgres_jsonb.sql 
b/ddl-scripts/create_tables_postgres_jsonb.sql
new file mode 100644
index 0000000..6c2e36a
--- /dev/null
+++ b/ddl-scripts/create_tables_postgres_jsonb.sql
@@ -0,0 +1,97 @@
+-- # SPDX-License-Identifier: Apache-2.0
+
+CREATE TABLE IF NOT EXISTS event_journal(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  db_timestamp timestamp with time zone NOT NULL,
+
+  event_ser_id INTEGER NOT NULL,
+  event_ser_manifest VARCHAR(255) NOT NULL,
+  event_payload JSONB NOT NULL,
+
+  deleted BOOLEAN DEFAULT FALSE NOT NULL,
+  writer VARCHAR(255) NOT NULL,
+  adapter_manifest VARCHAR(255),
+  tags TEXT ARRAY,
+
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BYTEA,
+
+  PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, 
entity_type, db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  write_timestamp BIGINT NOT NULL,
+  ser_id INTEGER NOT NULL,
+  ser_manifest VARCHAR(255) NOT NULL,
+  snapshot JSONB NOT NULL,
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BYTEA,
+
+  PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  revision BIGINT NOT NULL,
+  db_timestamp timestamp with time zone NOT NULL,
+
+  state_ser_id INTEGER NOT NULL,
+  state_ser_manifest VARCHAR(255),
+  state_payload JSONB NOT NULL,
+  tags TEXT ARRAY,
+
+  PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, 
entity_type, db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table 
is not created.
+CREATE TABLE IF NOT EXISTS pekko_projection_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  current_offset VARCHAR(255) NOT NULL,
+  manifest VARCHAR(32) NOT NULL,
+  mergeable BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS pekko_projection_timestamp_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  slice INT NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  -- timestamp_offset is the db_timestamp of the original event
+  timestamp_offset timestamp with time zone NOT NULL,
+  -- timestamp_consumed is when the offset was stored
+  -- the consumer lag is timestamp_consumed - timestamp_offset
+  timestamp_consumed timestamp with time zone NOT NULL,
+  PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS pekko_projection_management (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  paused BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index aeddf55..ecf6730 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -30,7 +30,8 @@ import pekko.actor.typed.Behavior
 import pekko.actor.typed.scaladsl.Behaviors
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.internal.PayloadCodec
+import pekko.persistence.r2dbc.internal.PayloadCodec.RichStatement
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
@@ -145,6 +146,7 @@ class EventSourcedEndToEndSpec
   private val querySettings = 
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
   private val projectionSettings = R2dbcProjectionSettings(system)
   private val stringSerializer = 
SerializationExtension(system).serializerFor(classOf[String])
+  private implicit val journalPayloadCodec: PayloadCodec = 
querySettings.journalPayloadCodec
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
@@ -171,7 +173,7 @@ class EventSourcedEndToEndSpec
         .bind(3, seqNr)
         .bind(4, timestamp)
         .bind(5, stringSerializer.identifier)
-        .bind(6, stringSerializer.toBinary(event))
+        .bindPayload(6, stringSerializer.toBinary(event))
     }
     result.futureValue shouldBe 1
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to