This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new e1fed42 add H2 version of new durable state tests (#280)
e1fed42 is described below
commit e1fed4242d76aa2f97995a51ca3b7f4dfba58af8
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 14 16:35:39 2025 +0200
add H2 version of new durable state tests (#280)
* feat: fix durable state not using schemaName set in config correctly
* fix: formatting
* refactor new tests
* temp disable test
* Update DurableStateStorePluginSpec.scala
* Update PostgresDurableStateStorePluginSpec.scala
* Update SchemaUtilsImpl.scala
* refactor
* Update DurableStateStorePluginSpec.scala
* close db
* Update DurableStateStorePluginSpec.scala
* Update DurableStateStorePluginSpec.scala
---------
Co-authored-by: patsta32 <[email protected]>
---
.../schema/h2/h2-create-schema-legacy.sql | 6 +-
.../main/resources/schema/h2/h2-create-schema.sql | 18 ++--
.../jdbc/testkit/internal/SchemaUtilsImpl.scala | 31 +++++-
.../scaladsl/DurableStateStorePluginSpec.scala | 93 ++++++++++++++++-
.../PostgresDurableStateStorePluginSpec.scala | 110 ++-------------------
5 files changed, 140 insertions(+), 118 deletions(-)
diff --git a/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
b/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
index 53b0f36..8922d69 100644
--- a/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
+++ b/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
@@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
);
-CREATE TABLE IF NOT EXISTS "durable_state" (
+CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" (
"global_offset" BIGINT NOT NULL AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
@@ -30,5 +30,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" (
PRIMARY KEY("persistence_id")
);
-CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
-CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
+CREATE INDEX "state_tag_idx" on PUBLIC."durable_state" ("tag");
+CREATE INDEX "state_global_offset_idx" on PUBLIC."durable_state"
("global_offset");
diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql
b/core/src/main/resources/schema/h2/h2-create-schema.sql
index ca44e87..ee6d0c9 100644
--- a/core/src/main/resources/schema/h2/h2-create-schema.sql
+++ b/core/src/main/resources/schema/h2/h2-create-schema.sql
@@ -1,4 +1,4 @@
-CREATE TABLE IF NOT EXISTS "event_journal" (
+CREATE TABLE IF NOT EXISTS PUBLIC."event_journal" (
"ordering" BIGINT UNIQUE NOT NULL AUTO_INCREMENT,
"deleted" BOOLEAN DEFAULT false NOT NULL,
"persistence_id" VARCHAR(255) NOT NULL,
@@ -15,9 +15,9 @@ CREATE TABLE IF NOT EXISTS "event_journal" (
PRIMARY KEY("persistence_id","sequence_number")
);
-CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal"
("ordering");
+CREATE UNIQUE INDEX "event_journal_ordering_idx" on PUBLIC."event_journal"
("ordering");
-CREATE TABLE IF NOT EXISTS "event_tag" (
+CREATE TABLE IF NOT EXISTS PUBLIC."event_tag" (
"event_id" BIGINT NOT NULL,
"tag" VARCHAR NOT NULL,
PRIMARY KEY("event_id", "tag"),
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS "event_tag" (
ON DELETE CASCADE
);
-CREATE TABLE IF NOT EXISTS "snapshot" (
+CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL,
@@ -39,10 +39,10 @@ CREATE TABLE IF NOT EXISTS "snapshot" (
PRIMARY KEY("persistence_id","sequence_number")
);
-CREATE SEQUENCE IF NOT EXISTS "global_offset_seq";
+CREATE SEQUENCE IF NOT EXISTS PUBLIC."global_offset_seq";
-CREATE TABLE IF NOT EXISTS "durable_state" (
- "global_offset" BIGINT DEFAULT NEXT VALUE FOR "global_offset_seq",
+CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" (
+ "global_offset" BIGINT DEFAULT NEXT VALUE FOR PUBLIC."global_offset_seq",
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BLOB NOT NULL,
@@ -52,5 +52,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" (
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
);
-CREATE INDEX IF NOT EXISTS "state_tag_idx" on "durable_state" ("tag");
-CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on "durable_state"
("global_offset");
+CREATE INDEX IF NOT EXISTS "state_tag_idx" on PUBLIC."durable_state" ("tag");
+CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on PUBLIC."durable_state"
("global_offset");
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
index ed0f1d0..409aa22 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
@@ -96,6 +96,18 @@ private[jdbc] object SchemaUtilsImpl {
SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad),
separator, logger, db)
}
+ /**
+ * INTERNAL API
+ */
+ @InternalApi
+ private[jdbc] def dropWithSlickButChangeSchema(schemaType: SchemaType,
logger: Logger, db: Database,
+ oldSchemaName: String, newSchemaName: String): Done = {
+ val (fileToLoad, separator) = dropScriptFor(schemaType, false)
+ val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad)
+ .replaceAll(s"$oldSchemaName.", s"$newSchemaName.")
+ SchemaUtilsImpl.applyScriptWithSlick(script, separator, logger, db)
+ }
+
/**
* INTERNAL API
*/
@@ -105,6 +117,19 @@ private[jdbc] object SchemaUtilsImpl {
SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad),
separator, logger, db)
}
+ /**
+ * INTERNAL API
+ */
+ @InternalApi
+ private[jdbc] def createWithSlickButChangeSchema(schemaType: SchemaType,
logger: Logger, db: Database,
+ oldSchemaName: String, newSchemaName: String): Done = {
+ val (fileToLoad, separator) = createScriptFor(schemaType, false)
+ val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad)
+ .replaceAll(s"$oldSchemaName.", s"$newSchemaName.")
+ val scriptWithSchemaCreate = s"CREATE SCHEMA IF NOT EXISTS
$newSchemaName$separator$script"
+ SchemaUtilsImpl.applyScriptWithSlick(scriptWithSchemaCreate, separator,
logger, db)
+ }
+
private def applyScriptWithSlick(script: String, separator: String, logger:
Logger, database: Database): Done = {
def withStatement(f: Statement => Unit): Done = {
@@ -152,7 +177,11 @@ private[jdbc] object SchemaUtilsImpl {
}
}
- private def slickProfileToSchemaType(profile: JdbcProfile): SchemaType =
+ /**
+ * INTERNAL API
+ */
+ @InternalApi
+ private[jdbc] def slickProfileToSchemaType(profile: JdbcProfile): SchemaType
=
profile match {
case PostgresProfile => Postgres
case MySQLProfile => MySQL
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
index 82af13d..36c7d61 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
@@ -12,13 +12,22 @@ package org.apache.pekko.persistence.jdbc.state.scaladsl
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko
import pekko.actor._
+import pekko.persistence.jdbc.config.SlickConfiguration
+import pekko.persistence.jdbc.db.SlickDatabase
+import pekko.persistence.jdbc.testkit.internal.SchemaUtilsImpl
import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.util.Timeout
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.ScalaFutures
+import org.slf4j.LoggerFactory
import slick.jdbc.{ H2Profile, JdbcProfile }
+import scala.concurrent.duration.DurationInt
+
abstract class DurableStateStorePluginSpec(config: Config, profile:
JdbcProfile)
extends AnyWordSpecLike
with BeforeAndAfterAll
@@ -45,5 +54,87 @@ abstract class DurableStateStorePluginSpec(config: Config,
profile: JdbcProfile)
}
}
+abstract class DurableStateStoreSchemaPluginSpec(val config: Config, profile:
JdbcProfile)
+ extends AnyWordSpecLike
+ with BeforeAndAfterAll
+ with Matchers
+ with ScalaFutures
+ with DataGenerationHelper {
+
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ protected def defaultSchemaName: String = "public"
+ val schemaName: String = "pekko"
+ implicit val timeout: Timeout = Timeout(1.minute)
+ implicit val defaultPatience: PatienceConfig =
+ PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis))
+
+ val customConfig: Config = ConfigFactory.parseString("""
+ jdbc-durable-state-store {
+ tables {
+ durable_state {
+ schemaName = "pekko"
+ }
+ }
+ }
+ """)
+
+ implicit lazy val system: ExtendedActorSystem =
+ ActorSystem(
+ "test",
+ customConfig.withFallback(config)
+ ).asInstanceOf[ExtendedActorSystem]
+
+ lazy val db = SlickDatabase.database(
+ config,
+ new SlickConfiguration(config.getConfig("slick")), "slick.db"
+ )
+
+ override def beforeAll(): Unit =
+ SchemaUtilsImpl.createWithSlickButChangeSchema(
+ SchemaUtilsImpl.slickProfileToSchemaType(profile),
+ logger, db, defaultSchemaName, schemaName)
+
+ override def afterAll(): Unit = {
+ SchemaUtilsImpl.dropWithSlickButChangeSchema(
+ SchemaUtilsImpl.slickProfileToSchemaType(profile),
+ logger, db, defaultSchemaName, schemaName)
+ db.close()
+ system.terminate().futureValue
+ }
+
+ "A durable state store plugin" must {
+ "instantiate a JdbcDurableDataStore successfully" in {
+
+ val store = DurableStateStoreRegistry
+ .get(system)
+
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
+
+ store shouldBe a[JdbcDurableStateStore[_]]
+ store.system.settings.config shouldBe system.settings.config
+ store.profile shouldBe profile
+ }
+
+ "persist states successfully" in {
+
+ val store = DurableStateStoreRegistry
+ .get(system)
+
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
+
+ upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1,
400).size shouldBe 400
+
+ eventually {
+ store.maxStateStoreOffset().futureValue shouldBe 400
+ }
+ }
+ }
+
+}
+
class H2DurableStateStorePluginSpec
extends
DurableStateStorePluginSpec(ConfigFactory.load("h2-application.conf"),
H2Profile)
+
+class H2DurableStateStorePluginSchemaSpec
+ extends
DurableStateStoreSchemaPluginSpec(ConfigFactory.load("h2-application.conf"),
+ H2Profile) {
+ override protected def defaultSchemaName: String = "PUBLIC"
+}
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
index 1fc25d6..b0b1975 100644
---
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
@@ -9,114 +9,16 @@
package org.apache.pekko.persistence.jdbc.integration
-import com.typesafe.config.{ Config, ConfigFactory }
-import org.apache.pekko
-import pekko.util.Timeout
-import pekko.actor.{ ActorSystem, ExtendedActorSystem }
-import pekko.persistence.jdbc.config.SlickConfiguration
-import pekko.persistence.jdbc.db.SlickDatabase
-import pekko.persistence.jdbc.state.scaladsl.{
- DataGenerationHelper,
+import com.typesafe.config.ConfigFactory
+import slick.jdbc.PostgresProfile
+import org.apache.pekko.persistence.jdbc.state.scaladsl.{
DurableStateStorePluginSpec,
- JdbcDurableStateStore
+ DurableStateStoreSchemaPluginSpec
}
-import pekko.persistence.jdbc.testkit.internal.Postgres
-import pekko.persistence.jdbc.util.DropCreate
-import pekko.persistence.state.DurableStateStoreRegistry
-import slick.jdbc.PostgresProfile
-import org.scalatest.concurrent.Eventually.eventually
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.time.{ Millis, Seconds, Span }
-import org.scalatest.wordspec.AnyWordSpecLike
-import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
-import scala.concurrent.duration.DurationInt
class PostgresDurableStateStorePluginSpec
extends
DurableStateStorePluginSpec(ConfigFactory.load("postgres-shared-db-application.conf"),
PostgresProfile) {}
class PostgresDurableStateStorePluginSchemaSpec
- extends AnyWordSpecLike
- with BeforeAndAfterAll
- with BeforeAndAfterEach
- with Matchers
- with ScalaFutures
- with DropCreate
- with DataGenerationHelper {
-
- val profile = PostgresProfile
- val config = ConfigFactory.load("postgres-application.conf")
- val schemaName: String = "pekko"
- implicit val timeout: Timeout = Timeout(1.minute)
- implicit val defaultPatience: PatienceConfig =
- PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis))
-
- val customConfig: Config = ConfigFactory.parseString(s"""
- jdbc-durable-state-store {
- tables {
- durable_state {
- schemaName = "pekko"
- }
- }
- }
- """)
-
- implicit lazy val system: ExtendedActorSystem =
- ActorSystem(
- "test",
- customConfig.withFallback(config)
- ).asInstanceOf[ExtendedActorSystem]
-
- lazy val db = SlickDatabase.database(
- config,
- new SlickConfiguration(config.getConfig("slick")), "slick.db"
- )
-
- private val createSchema = s"CREATE SCHEMA IF NOT EXISTS $schemaName;"
- private val moveDurableStateTableToSchema = s"alter table
public.durable_state set schema $schemaName;"
- private val moveDurableStateTableToPublic = s"alter table
$schemaName.durable_state set schema public;"
- private val createSchemaAndMoveTable =
s"${createSchema}${moveDurableStateTableToSchema}"
-
- override def beforeAll(): Unit = {
- dropAndCreate(Postgres)
- }
-
- override def beforeEach(): Unit = {
- withStatement(_.execute(createSchemaAndMoveTable))
- }
-
- "A durable state store plugin" must {
- "instantiate a JdbcDurableDataStore successfully" in {
-
- val store = DurableStateStoreRegistry
- .get(system)
-
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
-
- store shouldBe a[JdbcDurableStateStore[_]]
- store.system.settings.config shouldBe system.settings.config
- store.profile shouldBe profile
- }
-
- "persist states successfully" in {
-
- val store = DurableStateStoreRegistry
- .get(system)
-
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
-
- upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1,
400).size shouldBe 400
-
- eventually {
- store.maxStateStoreOffset().futureValue shouldBe 400
- }
- }
- }
-
- override def afterEach(): Unit = {
- withStatement(_.execute(moveDurableStateTableToPublic))
- }
-
- override def afterAll(): Unit = {
- system.terminate().futureValue
-
- }
-}
+ extends
DurableStateStoreSchemaPluginSpec(ConfigFactory.load("postgres-application.conf"),
+ PostgresProfile) {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]