This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e1fed42  add H2 version of new durable state tests (#280)
e1fed42 is described below

commit e1fed4242d76aa2f97995a51ca3b7f4dfba58af8
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Apr 14 16:35:39 2025 +0200

    add H2 version of new durable state tests (#280)
    
    * feat: fix durable state not using schemaName set in config correctly
    
    * fix: formatting
    
    * refactor new tests
    
    * temp disable test
    
    * Update DurableStateStorePluginSpec.scala
    
    * Update PostgresDurableStateStorePluginSpec.scala
    
    * Update SchemaUtilsImpl.scala
    
    * refactor
    
    * Update DurableStateStorePluginSpec.scala
    
    * close db
    
    * Update DurableStateStorePluginSpec.scala
    
    * Update DurableStateStorePluginSpec.scala
    
    ---------
    
    Co-authored-by: patsta32 <[email protected]>
---
 .../schema/h2/h2-create-schema-legacy.sql          |   6 +-
 .../main/resources/schema/h2/h2-create-schema.sql  |  18 ++--
 .../jdbc/testkit/internal/SchemaUtilsImpl.scala    |  31 +++++-
 .../scaladsl/DurableStateStorePluginSpec.scala     |  93 ++++++++++++++++-
 .../PostgresDurableStateStorePluginSpec.scala      | 110 ++-------------------
 5 files changed, 140 insertions(+), 118 deletions(-)

diff --git a/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql 
b/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
index 53b0f36..8922d69 100644
--- a/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
+++ b/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
@@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
 );
 
 
-CREATE TABLE IF NOT EXISTS "durable_state" (
+CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" (
     "global_offset" BIGINT NOT NULL AUTO_INCREMENT,
     "persistence_id" VARCHAR(255) NOT NULL,
     "revision" BIGINT NOT NULL,
@@ -30,5 +30,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" (
     PRIMARY KEY("persistence_id")
     );
 
-CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
-CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
+CREATE INDEX "state_tag_idx" on PUBLIC."durable_state" ("tag");
+CREATE INDEX "state_global_offset_idx" on PUBLIC."durable_state" 
("global_offset");
diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql 
b/core/src/main/resources/schema/h2/h2-create-schema.sql
index ca44e87..ee6d0c9 100644
--- a/core/src/main/resources/schema/h2/h2-create-schema.sql
+++ b/core/src/main/resources/schema/h2/h2-create-schema.sql
@@ -1,4 +1,4 @@
-CREATE TABLE IF NOT EXISTS "event_journal" (
+CREATE TABLE IF NOT EXISTS PUBLIC."event_journal" (
     "ordering" BIGINT UNIQUE NOT NULL AUTO_INCREMENT,
     "deleted" BOOLEAN DEFAULT false NOT NULL,
     "persistence_id" VARCHAR(255) NOT NULL,
@@ -15,9 +15,9 @@ CREATE TABLE IF NOT EXISTS "event_journal" (
     PRIMARY KEY("persistence_id","sequence_number")
     );
 
-CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" 
("ordering");
+CREATE UNIQUE INDEX "event_journal_ordering_idx" on PUBLIC."event_journal" 
("ordering");
 
-CREATE TABLE IF NOT EXISTS "event_tag" (
+CREATE TABLE IF NOT EXISTS PUBLIC."event_tag" (
     "event_id" BIGINT NOT NULL,
     "tag" VARCHAR NOT NULL,
     PRIMARY KEY("event_id", "tag"),
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS "event_tag" (
       ON DELETE CASCADE
 );
 
-CREATE TABLE IF NOT EXISTS "snapshot" (
+CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
     "persistence_id" VARCHAR(255) NOT NULL,
     "sequence_number" BIGINT NOT NULL,
     "created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL,
@@ -39,10 +39,10 @@ CREATE TABLE IF NOT EXISTS "snapshot" (
     PRIMARY KEY("persistence_id","sequence_number")
     );
 
-CREATE SEQUENCE IF NOT EXISTS "global_offset_seq";
+CREATE SEQUENCE IF NOT EXISTS PUBLIC."global_offset_seq";
 
-CREATE TABLE IF NOT EXISTS "durable_state" (
-    "global_offset" BIGINT DEFAULT NEXT VALUE FOR "global_offset_seq",
+CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" (
+    "global_offset" BIGINT DEFAULT NEXT VALUE FOR PUBLIC."global_offset_seq",
     "persistence_id" VARCHAR(255) NOT NULL,
     "revision" BIGINT NOT NULL,
     "state_payload" BLOB NOT NULL,
@@ -52,5 +52,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" (
     "state_timestamp" BIGINT NOT NULL,
     PRIMARY KEY("persistence_id")
     );
-CREATE INDEX IF NOT EXISTS "state_tag_idx" on "durable_state" ("tag");
-CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on "durable_state" 
("global_offset");
+CREATE INDEX IF NOT EXISTS "state_tag_idx" on PUBLIC."durable_state" ("tag");
+CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on PUBLIC."durable_state" 
("global_offset");
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
index ed0f1d0..409aa22 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala
@@ -96,6 +96,18 @@ private[jdbc] object SchemaUtilsImpl {
     
SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad),
 separator, logger, db)
   }
 
+  /**
+   * INTERNAL API
+   */
+  @InternalApi
+  private[jdbc] def dropWithSlickButChangeSchema(schemaType: SchemaType, 
logger: Logger, db: Database,
+      oldSchemaName: String, newSchemaName: String): Done = {
+    val (fileToLoad, separator) = dropScriptFor(schemaType, false)
+    val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad)
+      .replaceAll(s"$oldSchemaName.", s"$newSchemaName.")
+    SchemaUtilsImpl.applyScriptWithSlick(script, separator, logger, db)
+  }
+
   /**
    * INTERNAL API
    */
@@ -105,6 +117,19 @@ private[jdbc] object SchemaUtilsImpl {
     
SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad),
 separator, logger, db)
   }
 
+  /**
+   * INTERNAL API
+   */
+  @InternalApi
+  private[jdbc] def createWithSlickButChangeSchema(schemaType: SchemaType, 
logger: Logger, db: Database,
+      oldSchemaName: String, newSchemaName: String): Done = {
+    val (fileToLoad, separator) = createScriptFor(schemaType, false)
+    val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad)
+      .replaceAll(s"$oldSchemaName.", s"$newSchemaName.")
+    val scriptWithSchemaCreate = s"CREATE SCHEMA IF NOT EXISTS 
$newSchemaName$separator$script"
+    SchemaUtilsImpl.applyScriptWithSlick(scriptWithSchemaCreate, separator, 
logger, db)
+  }
+
   private def applyScriptWithSlick(script: String, separator: String, logger: 
Logger, database: Database): Done = {
 
     def withStatement(f: Statement => Unit): Done = {
@@ -152,7 +177,11 @@ private[jdbc] object SchemaUtilsImpl {
     }
   }
 
-  private def slickProfileToSchemaType(profile: JdbcProfile): SchemaType =
+  /**
+   * INTERNAL API
+   */
+  @InternalApi
+  private[jdbc] def slickProfileToSchemaType(profile: JdbcProfile): SchemaType 
=
     profile match {
       case PostgresProfile  => Postgres
       case MySQLProfile     => MySQL
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
index 82af13d..36c7d61 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
@@ -12,13 +12,22 @@ package org.apache.pekko.persistence.jdbc.state.scaladsl
 import com.typesafe.config.{ Config, ConfigFactory }
 import org.apache.pekko
 import pekko.actor._
+import pekko.persistence.jdbc.config.SlickConfiguration
+import pekko.persistence.jdbc.db.SlickDatabase
+import pekko.persistence.jdbc.testkit.internal.SchemaUtilsImpl
 import pekko.persistence.state.DurableStateStoreRegistry
+import pekko.util.Timeout
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{ Millis, Seconds, Span }
 import org.scalatest.wordspec.AnyWordSpecLike
 import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.ScalaFutures
+import org.slf4j.LoggerFactory
 import slick.jdbc.{ H2Profile, JdbcProfile }
 
+import scala.concurrent.duration.DurationInt
+
 abstract class DurableStateStorePluginSpec(config: Config, profile: 
JdbcProfile)
     extends AnyWordSpecLike
     with BeforeAndAfterAll
@@ -45,5 +54,87 @@ abstract class DurableStateStorePluginSpec(config: Config, 
profile: JdbcProfile)
   }
 }
 
+abstract class DurableStateStoreSchemaPluginSpec(val config: Config, profile: 
JdbcProfile)
+    extends AnyWordSpecLike
+    with BeforeAndAfterAll
+    with Matchers
+    with ScalaFutures
+    with DataGenerationHelper {
+
+  private val logger = LoggerFactory.getLogger(this.getClass)
+  protected def defaultSchemaName: String = "public"
+  val schemaName: String = "pekko"
+  implicit val timeout: Timeout = Timeout(1.minute)
+  implicit val defaultPatience: PatienceConfig =
+    PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis))
+
+  val customConfig: Config = ConfigFactory.parseString("""
+    jdbc-durable-state-store {
+      tables {
+        durable_state {
+          schemaName = "pekko"
+        }
+      }
+    }
+  """)
+
+  implicit lazy val system: ExtendedActorSystem =
+    ActorSystem(
+      "test",
+      customConfig.withFallback(config)
+    ).asInstanceOf[ExtendedActorSystem]
+
+  lazy val db = SlickDatabase.database(
+    config,
+    new SlickConfiguration(config.getConfig("slick")), "slick.db"
+  )
+
+  override def beforeAll(): Unit =
+    SchemaUtilsImpl.createWithSlickButChangeSchema(
+      SchemaUtilsImpl.slickProfileToSchemaType(profile),
+      logger, db, defaultSchemaName, schemaName)
+
+  override def afterAll(): Unit = {
+    SchemaUtilsImpl.dropWithSlickButChangeSchema(
+      SchemaUtilsImpl.slickProfileToSchemaType(profile),
+      logger, db, defaultSchemaName, schemaName)
+    db.close()
+    system.terminate().futureValue
+  }
+
+  "A durable state store plugin" must {
+    "instantiate a JdbcDurableDataStore successfully" in {
+
+      val store = DurableStateStoreRegistry
+        .get(system)
+        
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
+
+      store shouldBe a[JdbcDurableStateStore[_]]
+      store.system.settings.config shouldBe system.settings.config
+      store.profile shouldBe profile
+    }
+
+    "persist states successfully" in {
+
+      val store = DurableStateStoreRegistry
+        .get(system)
+        
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
+
+      upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1, 
400).size shouldBe 400
+
+      eventually {
+        store.maxStateStoreOffset().futureValue shouldBe 400
+      }
+    }
+  }
+
+}
+
 class H2DurableStateStorePluginSpec
     extends 
DurableStateStorePluginSpec(ConfigFactory.load("h2-application.conf"), 
H2Profile)
+
+class H2DurableStateStorePluginSchemaSpec
+    extends 
DurableStateStoreSchemaPluginSpec(ConfigFactory.load("h2-application.conf"),
+      H2Profile) {
+  override protected def defaultSchemaName: String = "PUBLIC"
+}
diff --git 
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
index 1fc25d6..b0b1975 100644
--- 
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
+++ 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala
@@ -9,114 +9,16 @@
 
 package org.apache.pekko.persistence.jdbc.integration
 
-import com.typesafe.config.{ Config, ConfigFactory }
-import org.apache.pekko
-import pekko.util.Timeout
-import pekko.actor.{ ActorSystem, ExtendedActorSystem }
-import pekko.persistence.jdbc.config.SlickConfiguration
-import pekko.persistence.jdbc.db.SlickDatabase
-import pekko.persistence.jdbc.state.scaladsl.{
-  DataGenerationHelper,
+import com.typesafe.config.ConfigFactory
+import slick.jdbc.PostgresProfile
+import org.apache.pekko.persistence.jdbc.state.scaladsl.{
   DurableStateStorePluginSpec,
-  JdbcDurableStateStore
+  DurableStateStoreSchemaPluginSpec
 }
-import pekko.persistence.jdbc.testkit.internal.Postgres
-import pekko.persistence.jdbc.util.DropCreate
-import pekko.persistence.state.DurableStateStoreRegistry
-import slick.jdbc.PostgresProfile
-import org.scalatest.concurrent.Eventually.eventually
-import org.scalatest.concurrent.ScalaFutures
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.time.{ Millis, Seconds, Span }
-import org.scalatest.wordspec.AnyWordSpecLike
-import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
-import scala.concurrent.duration.DurationInt
 
 class PostgresDurableStateStorePluginSpec
     extends 
DurableStateStorePluginSpec(ConfigFactory.load("postgres-shared-db-application.conf"),
 PostgresProfile) {}
 
 class PostgresDurableStateStorePluginSchemaSpec
-    extends AnyWordSpecLike
-    with BeforeAndAfterAll
-    with BeforeAndAfterEach
-    with Matchers
-    with ScalaFutures
-    with DropCreate
-    with DataGenerationHelper {
-
-  val profile = PostgresProfile
-  val config = ConfigFactory.load("postgres-application.conf")
-  val schemaName: String = "pekko"
-  implicit val timeout: Timeout = Timeout(1.minute)
-  implicit val defaultPatience: PatienceConfig =
-    PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis))
-
-  val customConfig: Config = ConfigFactory.parseString(s"""
-    jdbc-durable-state-store {
-      tables {
-        durable_state {
-          schemaName = "pekko"
-        }
-      }
-    }
-  """)
-
-  implicit lazy val system: ExtendedActorSystem =
-    ActorSystem(
-      "test",
-      customConfig.withFallback(config)
-    ).asInstanceOf[ExtendedActorSystem]
-
-  lazy val db = SlickDatabase.database(
-    config,
-    new SlickConfiguration(config.getConfig("slick")), "slick.db"
-  )
-
-  private val createSchema = s"CREATE SCHEMA IF NOT EXISTS $schemaName;"
-  private val moveDurableStateTableToSchema = s"alter table 
public.durable_state set schema $schemaName;"
-  private val moveDurableStateTableToPublic = s"alter table 
$schemaName.durable_state set schema public;"
-  private val createSchemaAndMoveTable = 
s"${createSchema}${moveDurableStateTableToSchema}"
-
-  override def beforeAll(): Unit = {
-    dropAndCreate(Postgres)
-  }
-
-  override def beforeEach(): Unit = {
-    withStatement(_.execute(createSchemaAndMoveTable))
-  }
-
-  "A durable state store plugin" must {
-    "instantiate a JdbcDurableDataStore successfully" in {
-
-      val store = DurableStateStoreRegistry
-        .get(system)
-        
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
-
-      store shouldBe a[JdbcDurableStateStore[_]]
-      store.system.settings.config shouldBe system.settings.config
-      store.profile shouldBe profile
-    }
-
-    "persist states successfully" in {
-
-      val store = DurableStateStoreRegistry
-        .get(system)
-        
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)
-
-      upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1, 
400).size shouldBe 400
-
-      eventually {
-        store.maxStateStoreOffset().futureValue shouldBe 400
-      }
-    }
-  }
-
-  override def afterEach(): Unit = {
-    withStatement(_.execute(moveDurableStateTableToPublic))
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate().futureValue
-
-  }
-}
+    extends 
DurableStateStoreSchemaPluginSpec(ConfigFactory.load("postgres-application.conf"),
+      PostgresProfile) {}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to