This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new 049e29b  pekko 1.3.0 (#334)
049e29b is described below

commit 049e29b3b4372f545525b072a59a6fd62add9f24
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Nov 26 12:07:36 2025 +0100

    pekko 1.3.0 (#334)
---
 .../cassandra/EventsByTagMultiJvmSpec.scala        |  2 +-
 .../cassandra/CassandraEventsByTagLoadSpec.scala   |  2 +-
 .../persistence/cassandra/CassandraSpec.scala      |  4 +-
 .../cassandra/EventsByTagCrashSpec.scala           |  2 +-
 .../cassandra/EventsByTagMigrationSpec.scala       | 24 +++---
 .../cassandra/EventsByTagRecoverySpec.scala        | 10 +--
 .../cassandra/EventsByTagRestartSpec.scala         |  6 +-
 .../cassandra/EventsByTagStressSpec.scala          |  2 +-
 .../cassandra/query/AllPersistenceIdsSpec.scala    | 16 ++--
 .../query/CassandraQueryJournalOverrideSpec.scala  |  6 +-
 .../cassandra/query/EventAdaptersReadSpec.scala    | 12 +--
 .../EventsByPersistenceIdFastForwardSpec.scala     |  2 +-
 ...ventsByPersistenceIdMultiPartitionGapSpec.scala | 10 +--
 .../query/EventsByPersistenceIdSpec.scala          | 54 ++++++-------
 .../EventsByPersistenceIdWithControlSpec.scala     | 10 +--
 .../cassandra/query/EventsByTagPubsubSpec.scala    |  2 +-
 .../cassandra/query/EventsByTagSpec.scala          | 90 +++++++++++-----------
 .../cassandra/query/EventsByTagStageSpec.scala     | 38 ++++-----
 .../query/javadsl/CassandraReadJournalSpec.scala   |  8 +-
 .../query/scaladsl/CassandraReadJournalSpec.scala  |  8 +-
 project/PekkoConnectorsDependency.scala            |  2 +-
 project/PekkoCoreDependency.scala                  |  2 +-
 project/PekkoManagementDependency.scala            |  2 +-
 23 files changed, 157 insertions(+), 157 deletions(-)

diff --git 
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
 
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
index f1545e6..f29c923 100644
--- 
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
+++ 
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
@@ -109,7 +109,7 @@ abstract class EventsByTagMultiJvmSpec
           queryJournal
             .eventsByTag("all", NoOffset)
             .map(e => (e.persistenceId, e.event.asInstanceOf[Int]))
-            .runWith(TestSink.probe)
+            .runWith(TestSink())
         }
       }
       enterBarrier("query-started")
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
index ee76f67..ddceff2 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
@@ -83,7 +83,7 @@ class CassandraEventsByTagLoadSpec extends 
CassandraSpec(CassandraEventsByTagLoa
 
   private def validateTagStream(readJournal: CassandraReadJournal)(tag: 
String): Unit = {
     system.log.info(s"Validating tag $tag")
-    val probe = readJournal.eventsByTag("orange", 
NoOffset).toMat(TestSink.probe)(Keep.right).run()
+    val probe = readJournal.eventsByTag("orange", 
NoOffset).toMat(TestSink())(Keep.right).run()
     var sequenceNrsPerPid = Map[String, Long]()
     var allReceived: Map[String, List[Long]] = 
Map.empty.withDefaultValue(List.empty)
     probe.request(messagesPerPersistenceId * nrPersistenceIds)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
index d2e5af1..ac715b8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/CassandraSpec.scala
@@ -304,10 +304,10 @@ abstract class CassandraSpec(
       .futureValue
 
   def eventsByTag(tag: String): TestSubscriber.Probe[Any] =
-    queries.eventsByTag(tag, NoOffset).map(_.event).runWith(TestSink.probe)
+    queries.eventsByTag(tag, NoOffset).map(_.event).runWith(TestSink())
 
   def expectEventsForTag(tag: String, elements: String*): Unit = {
-    val probe = queries.eventsByTag(tag, 
NoOffset).map(_.event).runWith(TestSink.probe)
+    val probe = queries.eventsByTag(tag, 
NoOffset).map(_.event).runWith(TestSink())
 
     probe.request(elements.length + 1)
     elements.foreach(probe.expectNext)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
index b74c6e9..3bfcfed 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagCrashSpec.scala
@@ -43,7 +43,7 @@ class EventsByTagCrashSpec extends 
CassandraSpec(EventsByTagRestartSpec.config)
         expectMsg(Ack)
       }
       val blueTags: Source[EventEnvelope, NotUsed] = 
queryJournal.eventsByTag(tag = "blue", offset = NoOffset)
-      val tagProbe = blueTags.runWith(TestSink.probe[EventEnvelope](system))
+      val tagProbe = blueTags.runWith(TestSink[EventEnvelope]()(system))
       (1L to msgs).foreach { m =>
         val expected = s"msg $m"
         tagProbe.request(1)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
index 4870fbe..fbbca56 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagMigrationSpec.scala
@@ -91,7 +91,7 @@ class EventsByTagMigrationProvidePersistenceIds extends 
AbstractEventsByTagMigra
       migrator.migratePidsToTagViews(List(pidOne)).futureValue shouldEqual Done
 
       val blueSrc = queries.eventsByTag("blue", NoOffset)
-      val blueProbe = blueSrc.runWith(TestSink.probe[Any])
+      val blueProbe = blueSrc.runWith(TestSink[Any]())
       blueProbe.request(5)
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 2, "e-2") => }
@@ -101,7 +101,7 @@ class EventsByTagMigrationProvidePersistenceIds extends 
AbstractEventsByTagMigra
       migrator.migratePidsToTagViews(List(pidTwo)).futureValue shouldEqual Done
 
       val blueSrcTakeTwo = queries.eventsByTag("blue", NoOffset)
-      val blueProbeTakeTwo = blueSrcTakeTwo.runWith(TestSink.probe[Any])
+      val blueProbeTakeTwo = blueSrcTakeTwo.runWith(TestSink[Any]())
       blueProbeTakeTwo.request(5)
       blueProbeTakeTwo.expectNextPF { case EventEnvelope(_, `pidOne`, 1, 
"e-1") => }
       blueProbeTakeTwo.expectNextPF { case EventEnvelope(_, `pidOne`, 2, 
"e-2") => }
@@ -188,7 +188,7 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
 
     "work with the current implementation" taggedAs RequiresCassandraThree in {
       val blueSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("blue", NoOffset)
-      val blueProbe = blueSrc.runWith(TestSink.probe[Any])
+      val blueProbe = blueSrc.runWith(TestSink[Any]())
       blueProbe.request(5)
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 2, "e-2") => }
@@ -201,7 +201,7 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
       blueProbe.cancel()
 
       val greenSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("green", NoOffset)
-      val greenProbe = greenSrc.runWith(TestSink.probe[Any])
+      val greenProbe = greenSrc.runWith(TestSink[Any]())
       greenProbe.request(4)
       greenProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       greenProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 4, "e-4") => }
@@ -210,27 +210,27 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
       greenProbe.cancel()
 
       val orangeSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("orange", NoOffset)
-      val orangeProbe = orangeSrc.runWith(TestSink.probe[Any])
+      val orangeProbe = orangeSrc.runWith(TestSink[Any]())
       orangeProbe.request(3)
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       orangeProbe.expectNoMessage(waitTime)
       orangeProbe.cancel()
 
       val bananaSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("banana", NoOffset)
-      val bananaProbe = bananaSrc.runWith(TestSink.probe[Any])
+      val bananaProbe = bananaSrc.runWith(TestSink[Any]())
       bananaProbe.request(3)
       bananaProbe.expectNoMessage(waitTime)
       bananaProbe.cancel()
 
       val redSrc: Source[EventEnvelope, NotUsed] = queries.eventsByTag("red", 
NoOffset)
-      val redProbe = redSrc.runWith(TestSink.probe[Any])
+      val redProbe = redSrc.runWith(TestSink[Any]())
       redProbe.request(3)
       redProbe.expectNextPF { case EventEnvelope(_, `pidWithSnapshot`, 10, 
"h-1") => }
       redProbe.expectNextPF { case EventEnvelope(_, `pidWithSnapshot`, 11, 
"h-2") => }
       redProbe.cancel()
 
       val excludedSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("bad-tag", NoOffset)
-      val excludedProbe = excludedSrc.runWith(TestSink.probe[Any])
+      val excludedProbe = excludedSrc.runWith(TestSink[Any]())
       excludedProbe.request(1)
       excludedProbe.expectNoMessage(waitTime)
       excludedProbe.cancel()
@@ -242,7 +242,7 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
       probe.expectMsg(RecoveryCompleted)
 
       val blueSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("blue", NoOffset)
-      val blueProbe = blueSrc.runWith(TestSink.probe[Any])
+      val blueProbe = blueSrc.runWith(TestSink[Any]())
       blueProbe.request(6)
       blueProbe.expectNextN(5) // ignore the ones we've already validated
       // This event wasn't migrated, should have been fixed on actor start up
@@ -251,7 +251,7 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
       blueProbe.cancel()
 
       val greenSrc: Source[EventEnvelope, NotUsed] = 
queries.eventsByTag("green", NoOffset)
-      val greenProbe = greenSrc.runWith(TestSink.probe[Any])
+      val greenProbe = greenSrc.runWith(TestSink[Any]())
       greenProbe.request(6)
       greenProbe.expectNextN(3) // ignore the ones we've already validated
       // This event wasn't migrated, should have been fixed on actor start up
@@ -284,7 +284,7 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
       expectMsg(Ack)
 
       val blueSrc: Source[EventEnvelope, NotUsed] = 
queriesTwo.eventsByTag("blue", NoOffset)
-      val blueProbe = 
blueSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemTwo).materializer)
+      val blueProbe = 
blueSrc.runWith(TestSink[Any]())(SystemMaterializer(systemTwo).materializer)
       blueProbe.request(10)
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 2, "e-2") => }
@@ -317,7 +317,7 @@ class EventsByTagMigrationSpec extends 
AbstractEventsByTagMigrationSpec {
       expectMsg(Ack)
 
       val orangeSrc: Source[EventEnvelope, NotUsed] = 
queriesThree.eventsByTag("orange", NoOffset)
-      val orangeProbe = 
orangeSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemThree).materializer)
+      val orangeProbe = 
orangeSrc.runWith(TestSink[Any]())(SystemMaterializer(systemThree).materializer)
       orangeProbe.request(3)
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 5, 
"new-event-1") => }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
index 9a724e2..99cab69 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRecoverySpec.scala
@@ -69,7 +69,7 @@ class EventsByTagRecoverySpec extends 
CassandraSpec(EventsByTagRecoverySpec.conf
         p1take2 ! PoisonPill
 
         val greenTags = queryJournal.eventsByTag(tag = "blue", offset = 
NoOffset)
-        val probe = greenTags.runWith(TestSink.probe[Any](system))
+        val probe = greenTags.runWith(TestSink[Any]()(system))
         probe.request(9)
         (1 to 8).foreach { i =>
           val event = s"e-$i"
@@ -87,7 +87,7 @@ class EventsByTagRecoverySpec extends 
CassandraSpec(EventsByTagRecoverySpec.conf
         }
 
         val greenTagsTake2 = queryJournal.eventsByTag(tag = "blue", offset = 
NoOffset)
-        val probeTake2 = greenTagsTake2.runWith(TestSink.probe[Any](system))
+        val probeTake2 = greenTagsTake2.runWith(TestSink[Any]()(system))
         probeTake2.request(13)
         (1 to 12).foreach { i =>
           val event = s"e-$i"
@@ -125,7 +125,7 @@ class EventsByTagRecoverySpec extends 
CassandraSpec(EventsByTagRecoverySpec.conf
         val queryJournal =
           
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
         val greenTags = queryJournal.eventsByTag(tag = "red", offset = 
NoOffset)
-        val probe = greenTags.runWith(TestSink.probe[Any](system))
+        val probe = greenTags.runWith(TestSink[Any]()(system))
         probe.request(9)
         (1 to 8).foreach { i =>
           val event = s"e-$i"
@@ -171,7 +171,7 @@ class EventsByTagRecoverySpec extends 
CassandraSpec(EventsByTagRecoverySpec.conf
         val queryJournal =
           
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
         val greenTags = queryJournal.eventsByTag(tag = "red", offset = 
NoOffset)
-        val probe = greenTags.runWith(TestSink.probe[Any](system))
+        val probe = greenTags.runWith(TestSink[Any]()(system))
         probe.request(13)
         (1 to 12).foreach { i =>
           val event = s"e-$i"
@@ -209,7 +209,7 @@ class EventsByTagRecoverySpec extends 
CassandraSpec(EventsByTagRecoverySpec.conf
         val queryJournal =
           
PersistenceQuery(systemTwo).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
         val blueTags = queryJournal.eventsByTag(tag = "blue", offset = 
NoOffset)
-        val probe = blueTags.runWith(TestSink.probe[Any](systemTwo))
+        val probe = blueTags.runWith(TestSink[Any]()(systemTwo))
         probe.request(6)
         (1 to 5).foreach { i =>
           val event = s"e-$i"
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
index cc3fcf5..3f4eb63 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagRestartSpec.scala
@@ -74,7 +74,7 @@ class EventsByTagRestartSpec extends 
CassandraSpec(EventsByTagRestartSpec.config
       }
 
       val blueTags = queryJournal.eventsByTag(tag, offset = NoOffset)
-      val tagProbe = blueTags.runWith(TestSink.probe[Any](system))
+      val tagProbe = blueTags.runWith(TestSink[Any]()(system))
       (0 until restarts).foreach { restart =>
         tagProbe.request(messagesPerRestart + 1)
         (1 to messagesPerRestart).foreach { i =>
@@ -109,7 +109,7 @@ class EventsByTagRestartSpec extends 
CassandraSpec(EventsByTagRestartSpec.config
       deathProbe.expectTerminated(p2)
 
       val greenTags = queryJournal.eventsByTag(tag, offset = NoOffset)
-      val tagProbe = greenTags.runWith(TestSink.probe[Any](system))
+      val tagProbe = greenTags.runWith(TestSink[Any]()(system))
       tagProbe.request(10)
       (1 to 3).foreach { n =>
         val event = s"e$n"
@@ -161,7 +161,7 @@ class EventsByTagRestartSpec extends 
CassandraSpec(EventsByTagRestartSpec.config
       probe2.expectMsg(Ack)
 
       val greenTags = queryJournal.eventsByTag(tag, offset = NoOffset)
-      val tagProbe = greenTags.runWith(TestSink.probe[Any](system))
+      val tagProbe = greenTags.runWith(TestSink[Any]()(system))
       tagProbe.request(10)
       // without the fix this would not complete because e4 will have tagSeqNr 
1 rather than the expected 4
       (1 to 6).foreach { n =>
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
index 5349aa8..08c4c86 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/EventsByTagStressSpec.scala
@@ -50,7 +50,7 @@ class EventsByTagStressSpec extends CassandraSpec(s"""
           .map(i => {
             (i.persistenceId, i.event.asInstanceOf[Int])
           })
-          .runWith(TestSink.probe)
+          .runWith(TestSink())
         (i, probe)
       }
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
index 4a8c7dc..fa5e0e7 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsSpec.scala
@@ -77,14 +77,14 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       setup("c", 1)
 
       val src = current()
-      src.runWith(TestSink.probe[Any]).request(4).expectNextUnordered("a", 
"b", "c").expectComplete()
+      src.runWith(TestSink[Any]()).request(4).expectNextUnordered("a", "b", 
"c").expectComplete()
     }
 
     "deliver persistenceId only once if there are multiple events spanning 
partitions" in {
       setup("d", 100)
 
       val src = current()
-      
src.runWith(TestSink.probe[Any]).request(10).expectNext("d").expectComplete()
+      src.runWith(TestSink[Any]()).request(10).expectNext("d").expectComplete()
     }
 
     "find existing persistence ids in batches if there is more of them than 
max-result-size-query" in {
@@ -93,7 +93,7 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       }
 
       val src = current()
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(1000)
 
       for (_ <- 1 to 1000) {
@@ -110,7 +110,7 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       setup("f", 1)
 
       val src = all()
-      val probe = 
src.runWith(TestSink.probe[Any]).request(5).expectNextUnordered("e", "f")
+      val probe = 
src.runWith(TestSink[Any]()).request(5).expectNextUnordered("e", "f")
 
       setup("g", 1)
 
@@ -121,7 +121,7 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       setup("h", 1)
       setup("i", 1)
       val src = all()
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
 
       probe.request(1)
       probe.expectNext()
@@ -142,7 +142,7 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       setup("o", 1)
 
       val src = all()
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(2)
       probe.expectNext()
       probe.expectNext()
@@ -158,7 +158,7 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       setup("p", 1000)
 
       val src = all()
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
 
       probe.request(10).expectNext("p").expectNoMessage(1000.millis)
 
@@ -175,7 +175,7 @@ class AllPersistenceIdsSpec extends 
CassandraSpec(AllPersistenceIdsSpec.config)
       setup("c2", 1)
 
       val src = 
queries.currentPersistenceIdsFromMessages().filterNot(_.startsWith("persistenceInit"))
-      src.runWith(TestSink.probe[Any]).request(4).expectNextUnordered("a2", 
"b2", "c2").expectComplete()
+      src.runWith(TestSink[Any]()).request(4).expectNextUnordered("a2", "b2", 
"c2").expectComplete()
     }
   }
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
index 4808542..02888a0 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/CassandraQueryJournalOverrideSpec.scala
@@ -59,20 +59,20 @@ class CassandraQueryJournalOverrideSpec extends 
CassandraSpec(CassandraQueryJour
       expectMsg(Ack)
 
       val currentEvents = journal.currentEventsByPersistenceId(pid, 0, 
Long.MaxValue)
-      val currentProbe = 
currentEvents.map(_.event.toString).runWith(TestSink.probe[String])
+      val currentProbe = 
currentEvents.map(_.event.toString).runWith(TestSink[String]())
       currentProbe.request(2)
       currentProbe.expectNext("cat")
       currentProbe.expectComplete()
 
       val liveEvents = journal.eventsByPersistenceId(pid, 0, Long.MaxValue)
-      val liveProbe = 
liveEvents.map(_.event.toString).runWith(TestSink.probe[String])
+      val liveProbe = 
liveEvents.map(_.event.toString).runWith(TestSink[String]())
       liveProbe.request(2)
       liveProbe.expectNext("cat")
       liveProbe.expectNoMessage(100.millis)
       liveProbe.cancel()
 
       val internalEvents = journal.eventsByPersistenceIdWithControl(pid, 0, 
Long.MaxValue, None)
-      val internalProbe = 
internalEvents.map(_.event.toString).runWith(TestSink.probe[String])
+      val internalProbe = 
internalEvents.map(_.event.toString).runWith(TestSink[String]())
       internalProbe.request(2)
       internalProbe.expectNext("cat")
       liveProbe.expectNoMessage(100.millis)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
index c468c62..bde3307 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventAdaptersReadSpec.scala
@@ -78,7 +78,7 @@ class EventAdaptersReadSpec extends 
CassandraSpec(EventAdaptersReadSpec.config)
       val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
       src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(2)
         .expectNext("a-1", "a-3")
         .expectNoMessage(500.millis)
@@ -96,7 +96,7 @@ class EventAdaptersReadSpec extends 
CassandraSpec(EventAdaptersReadSpec.config)
         })
 
       val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
-      
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("b-1", 
"b-2", "b-2", "b-3").expectComplete()
+      src.map(_.event).runWith(TestSink[Any]()).request(10).expectNext("b-1", 
"b-2", "b-2", "b-3").expectComplete()
     }
 
     "duplicate events with prefix added by the event-adapter" in {
@@ -104,7 +104,7 @@ class EventAdaptersReadSpec extends 
CassandraSpec(EventAdaptersReadSpec.config)
       setup("c", 1, _ => "prefixed:foo:")
 
       val src = queries.currentEventsByPersistenceId("c", 0L, Long.MaxValue)
-      
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("foo-c-1").expectComplete()
+      
src.map(_.event).runWith(TestSink[Any]()).request(10).expectNext("foo-c-1").expectComplete()
     }
 
   }
@@ -119,7 +119,7 @@ class EventAdaptersReadSpec extends 
CassandraSpec(EventAdaptersReadSpec.config)
         })
 
       val src = queries.eventsByTag("red", NoOffset)
-      val sub = src.map(_.event).runWith(TestSink.probe[Any])
+      val sub = src.map(_.event).runWith(TestSink[Any]())
       sub.request(10)
       sub.expectNext("d-1", "d-3", "d-5")
       sub.expectNoMessage(waitTime)
@@ -135,7 +135,7 @@ class EventAdaptersReadSpec extends 
CassandraSpec(EventAdaptersReadSpec.config)
         })
 
       val src = queries.eventsByTag("yellow", NoOffset)
-      val sub = src.map(_.event).runWith(TestSink.probe[Any])
+      val sub = src.map(_.event).runWith(TestSink[Any]())
 
       sub.request(10).expectNext("e-1", "e-2", "e-2", 
"e-3").expectNoMessage(waitTime)
       sub.cancel()
@@ -150,7 +150,7 @@ class EventAdaptersReadSpec extends 
CassandraSpec(EventAdaptersReadSpec.config)
         })
 
       val src = queries.eventsByTag("green", NoOffset)
-      val sub = src.map(_.event).runWith(TestSink.probe[Any])
+      val sub = src.map(_.event).runWith(TestSink[Any]())
       sub.request(10).expectNext("e-1", "foo-e-2", 
"e-3").expectNoMessage(waitTime)
       sub.cancel()
     }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
index 5e6403f..83efead 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala
@@ -47,7 +47,7 @@ class EventsByPersistenceIdFastForwardSpec
     writeTestEvent(evt1)
 
     val src = queries.eventsByPersistenceIdWithControl("f", 0L, Long.MaxValue)
-    val (futureControl, probe) = 
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+    val (futureControl, probe) = 
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
     val control = futureControl.futureValue
     probe.request(5)
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
index 3a8b5ce..4fa46ce 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala
@@ -58,7 +58,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
     writeTestEvent(pr5, partitionNr = 4L)
 
     val src = queries.currentEventsByPersistenceId("mpg1", 0L, Long.MaxValue)
-    val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+    val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
     probe.expectNext("e1")
     probe.expectNext("e5")
@@ -84,7 +84,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
     writeTestEvent(pr5, partitionNr = 4L)
 
     val src = queries.currentEventsByPersistenceId("mpg2", 0L, Long.MaxValue)
-    val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+    val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
     probe.expectNext("e1")
     probe.expectNext("e2")
@@ -111,7 +111,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
     writeTestEvent(pr5, partitionNr = 4L)
 
     val src = queries.currentEventsByPersistenceId("mpg3", 0L, Long.MaxValue)
-    val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+    val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
     probe.expectNext("e1")
     probe.expectNext("e2")
@@ -154,7 +154,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
     writeTestEvent(pr9, partitionNr = 6L)
 
     val src = queries.currentEventsByPersistenceId("mpg4", 0L, Long.MaxValue)
-    val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+    val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
     probe.expectNext("e1")
     probe.expectNext("e2")
@@ -183,7 +183,7 @@ class EventsByPersistenceIdMultiPartitionGapSpec
     writeTestEvent(pr4, partitionNr = 3L)
 
     val src = queries.currentEventsByPersistenceId("mpg5", 0L, Long.MaxValue)
-    val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+    val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
     probe.expectNext("e3")
     probe.expectNext("e4")
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
index 376f3eb..332ebbc 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
@@ -59,7 +59,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
       src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(2)
         .expectNext("a-1", "a-2")
         .expectNoMessage(noMsgTimeout)
@@ -72,19 +72,19 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       setup("b", 10)
       val src = queries.currentEventsByPersistenceId("b", 5L, Long.MaxValue)
 
-      
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(7).expectNext(5, 6, 
7, 8, 9, 10).expectComplete()
+      src.map(_.sequenceNr).runWith(TestSink[Any]()).request(7).expectNext(5, 
6, 7, 8, 9, 10).expectComplete()
     }
 
     "not see any events if the stream starts after current latest event" in {
       setup("c", 3)
       val src = queries.currentEventsByPersistenceId("c", 5L, Long.MaxValue)
-      src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectComplete()
+      src.map(_.event).runWith(TestSink[Any]()).request(5).expectComplete()
     }
 
     "find existing events up to a sequence number" in {
       setup("d", 3)
       val src = queries.currentEventsByPersistenceId("d", 0L, 2L)
-      
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(5).expectNext(1, 
2).expectComplete()
+      src.map(_.sequenceNr).runWith(TestSink[Any]()).request(5).expectNext(1, 
2).expectComplete()
     }
 
     "not see new events after demand request" in {
@@ -92,7 +92,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
 
       val src = queries.currentEventsByPersistenceId("e", 0L, Long.MaxValue)
       val probe =
-        
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("e-1", 
"e-2").expectNoMessage(noMsgTimeout)
+        src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("e-1", 
"e-2").expectNoMessage(noMsgTimeout)
 
       ref ! "e-4"
       expectMsg("e-4-done")
@@ -105,7 +105,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
 
       val src = queries.currentEventsByPersistenceId("f", 0L, Long.MaxValue)
       val probe =
-        
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("f-1", 
"f-2").expectNoMessage(noMsgTimeout)
+        src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("f-1", 
"f-2").expectNoMessage(noMsgTimeout)
 
       probe
         .expectNoMessage(noMsgTimeout)
@@ -119,7 +119,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
     "stop if there are no events" in {
       val src = queries.currentEventsByPersistenceId("g", 0L, Long.MaxValue)
 
-      src.runWith(TestSink.probe[Any]).request(2).expectComplete()
+      src.runWith(TestSink[Any]()).request(2).expectComplete()
     }
 
     "produce correct sequence of sequence numbers" in {
@@ -128,7 +128,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val src = queries.currentEventsByPersistenceId("h", 0L, Long.MaxValue)
       src
         .map(x => (x.persistenceId, x.sequenceNr))
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(4)
         .expectNext(("h", 1), ("h", 2), ("h", 3))
         .expectComplete()
@@ -152,7 +152,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val src = queries.currentEventsByPersistenceId("i", 0L, Long.MaxValue)
       src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(10)
         .expectNextN((1 to 10).map(i => s"i-$i"))
         .expectNoMessage(noMsgTimeout)
@@ -170,7 +170,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
         .eventsByPersistenceId("jo", 0L, Long.MaxValue)
         .viaMat(KillSwitches.single)(Keep.right)
         .map(x => (x.event, x.sequenceNr, x.offset))
-        .toMat(TestSink.probe[Any])(Keep.both)
+        .toMat(TestSink[Any]())(Keep.both)
         .run()
 
       probe.request(5)
@@ -234,7 +234,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val src = queries.currentEventsByPersistenceId("i2", 0L, Long.MaxValue)
       src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(100)
         .expectNextN((1 to 15).map(i => s"i2-$i"))
         .expectComplete()
@@ -257,7 +257,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val src = queries.currentEventsByPersistenceId("i3", 0L, Long.MaxValue)
       src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(10)
         .expectNextN((1 to 10).map(i => s"i3-$i"))
         .expectNoMessage(noMsgTimeout)
@@ -273,7 +273,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       ref ! TestActor.DeleteTo(48)
       expectMsg(DeleteMessagesSuccess(48))
       val src = queries.currentEventsByPersistenceId("i4", 0, Long.MaxValue)
-      
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("i4-49", 
"i4-50").expectComplete()
+      src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("i4-49", 
"i4-50").expectComplete()
     }
 
     "detect gaps" in {
@@ -286,7 +286,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       writeTestEvent(pr4)
 
       val src = queries.currentEventsByPersistenceId("i5", 0L, Long.MaxValue)
-      val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+      val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
       // timeout dictated by events-by-persistence-id-gap-timeout above
       probe.within(7.seconds) {
@@ -309,7 +309,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val pr1 = PersistentRepr("e1", 1L, "with-meta", "").withMetadata(meta)
       writeTestEvent(pr1)
       val src = queries.currentEventsByPersistenceId("with-meta", 0L, 
Long.MaxValue)
-      
src.map(_.eventMetadata).runWith(TestSink.probe[Any]).request(2).expectNext(Some(meta)).expectComplete()
+      
src.map(_.eventMetadata).runWith(TestSink[Any]()).request(2).expectNext(Some(meta)).expectComplete()
     }
   }
 
@@ -318,7 +318,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
     "find new events" in {
       val ref = setup("j", 3)
       val src = queries.eventsByPersistenceId("j", 0L, Long.MaxValue)
-      val probe = 
src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("j-1", 
"j-2", "j-3")
+      val probe = 
src.map(_.event).runWith(TestSink[Any]()).request(5).expectNext("j-1", "j-2", 
"j-3")
 
       ref ! "j-4"
       expectMsg("j-4-done")
@@ -330,7 +330,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
     "find new events if the stream starts after current latest event" in {
       val ref = setup("k", 4)
       val src = queries.eventsByPersistenceId("k", 5L, Long.MaxValue)
-      val probe = 
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(5).expectNoMessage(noMsgTimeout)
+      val probe = 
src.map(_.sequenceNr).runWith(TestSink[Any]()).request(5).expectNoMessage(noMsgTimeout)
 
       ref ! "k-5"
       expectMsg("k-5-done")
@@ -349,7 +349,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
     "find new events up to a sequence number" in {
       val ref = setup("l", 3)
       val src = queries.eventsByPersistenceId("l", 0L, 4L)
-      val probe = 
src.map(_.sequenceNr).runWith(TestSink.probe[Any]).request(5).expectNext(1, 2, 
3)
+      val probe = 
src.map(_.sequenceNr).runWith(TestSink[Any]()).request(5).expectNext(1, 2, 3)
 
       ref ! "l-4"
       expectMsg("l-4-done")
@@ -361,7 +361,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val ref = setup("m", 3)
       val src = queries.eventsByPersistenceId("m", 0L, Long.MaxValue)
       val probe =
-        
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("m-1", 
"m-2").expectNoMessage(noMsgTimeout)
+        src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("m-1", 
"m-2").expectNoMessage(noMsgTimeout)
 
       ref ! "m-4"
       expectMsg("m-4-done")
@@ -376,7 +376,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
 
       val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue)
       val probe =
-        
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("n-1", 
"n-2").expectNoMessage(noMsgTimeout)
+        src.map(_.event).runWith(TestSink[Any]()).request(2).expectNext("n-1", 
"n-2").expectNoMessage(noMsgTimeout)
 
       probe
         .expectNoMessage(noMsgTimeout)
@@ -393,7 +393,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       setup("o2", 1) // Database init.
       val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue)
 
-      val probe = 
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNoMessage(noMsgTimeout)
+      val probe = 
src.map(_.event).runWith(TestSink[Any]()).request(10).expectNoMessage(noMsgTimeout)
 
       probe.cancel()
     }
@@ -402,7 +402,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       setup("p2", 1) // Database init.
       val src = queries.eventsByPersistenceId("p", 0L, Long.MaxValue)
 
-      val probe = 
src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNoMessage(noMsgTimeout)
+      val probe = 
src.map(_.event).runWith(TestSink[Any]()).request(2).expectNoMessage(noMsgTimeout)
 
       setup("p", 2)
 
@@ -418,7 +418,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
 
       val probe = src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(16)
         .expectNextN((1 to 15).map(i => s"q-$i"))
         .expectNoMessage(noMsgTimeout)
@@ -457,7 +457,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       val src = queries.eventsByPersistenceId("q2", 0L, Long.MaxValue)
       val probe = src
         .map(_.event)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(10)
         .expectNextN((1 to 10).map(i => s"q2-$i"))
         .expectNoMessage(noMsgTimeout)
@@ -494,7 +494,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       writeTestEvent(pr4)
 
       val src = queries.currentEventsByPersistenceId("gap1", 0L, Long.MaxValue)
-      val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+      val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
       probe.expectNext("e1")
       probe.expectNext("e2")
@@ -514,7 +514,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
       writeTestEvent(pr4)
 
       val src = queries.currentEventsByPersistenceId("gap2", 0L, Long.MaxValue)
-      val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(10)
+      val probe = src.map(_.event).runWith(TestSink[Any]()).request(10)
 
       probe.expectNext("e1")
       probe.expectNext("e2")
@@ -529,7 +529,7 @@ class EventsByPersistenceIdSpec extends 
CassandraSpec(EventsByPersistenceIdSpec.
 
     "not complete when empty" in {
       val src = queries.eventsByPersistenceId("r", 0L, Long.MaxValue)
-      val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5)
+      val probe = src.map(_.event).runWith(TestSink[Any]()).request(5)
 
       probe.expectNoMessage(100.millis)
       probe.cancel()
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
index e4e2dfb..292c77c 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala
@@ -63,7 +63,7 @@ class EventsByPersistenceIdWithControlSpec extends 
CassandraSpec(EventsByPersist
 
       val src = queries.eventsByPersistenceIdWithControl("a", 0L, 
Long.MaxValue)
 
-      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
       val control = futureControl.futureValue
       control.poll(3)
 
@@ -82,7 +82,7 @@ class EventsByPersistenceIdWithControlSpec extends 
CassandraSpec(EventsByPersist
       val ref = setup("b", 8)
 
       val src = queries.eventsByPersistenceIdWithControl("b", 0L, 
Long.MaxValue)
-      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
       val control = futureControl.futureValue
       control.poll(8)
 
@@ -103,7 +103,7 @@ class EventsByPersistenceIdWithControlSpec extends 
CassandraSpec(EventsByPersist
       val ref = setup("c", 2)
 
       val src = queries.eventsByPersistenceIdWithControl("c", 0L, 
Long.MaxValue)
-      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
       val control = futureControl.futureValue
       control.poll(2)
 
@@ -131,7 +131,7 @@ class EventsByPersistenceIdWithControlSpec extends 
CassandraSpec(EventsByPersist
       val ref = setup("d", 12)
 
       val src = queries.eventsByPersistenceIdWithControl("d", 0L, 
Long.MaxValue)
-      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
       val control = futureControl.futureValue
       control.poll(12)
 
@@ -156,7 +156,7 @@ class EventsByPersistenceIdWithControlSpec extends 
CassandraSpec(EventsByPersist
       setup("e", 35)
 
       val src = queries.eventsByPersistenceIdWithControl("e", 0L, 
Long.MaxValue)
-      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink.probe[Any])(Keep.both).run()
+      val (futureControl, probe) = 
src.map(_.event).toMat(TestSink[Any]())(Keep.both).run()
       val control = futureControl.futureValue
       control.poll(35)
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
index 8b0730b..46a6652 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagPubsubSpec.scala
@@ -62,7 +62,7 @@ class EventsByTagPubsubSpec extends 
CassandraSpec(EventsByTagPubsubSpec.config)
       val actor = system.actorOf(TestActor.props("EventsByTagPubsubSpec_a"))
 
       val blackSrc = queries.eventsByTag(tag = "black", offset = NoOffset)
-      val probe = blackSrc.runWith(TestSink.probe[Any])
+      val probe = blackSrc.runWith(TestSink[Any]())
       probe.request(2)
       probe.expectNoMessage(300.millis)
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
index e9ef11b..ddda87f 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
@@ -209,7 +209,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       expectMsg(s"a green leaf-done")
 
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe = greenSrc.runWith(TestSink.probe[Any])
+      val probe = greenSrc.runWith(TestSink[Any]())
       probe.request(2)
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") 
=> e }
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green 
banana") => e }
@@ -219,13 +219,13 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       probe.expectComplete()
 
       val blackSrc = queries.currentEventsByTag(tag = "black", offset = 
NoOffset)
-      val probe2 = blackSrc.runWith(TestSink.probe[Any])
+      val probe2 = blackSrc.runWith(TestSink[Any]())
       probe2.request(5)
       probe2.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") 
=> e }
       probe2.expectComplete()
 
       val appleSrc = queries.currentEventsByTag(tag = "apple", offset = 
NoOffset)
-      val probe3 = appleSrc.runWith(TestSink.probe[Any])
+      val probe3 = appleSrc.runWith(TestSink[Any]())
       probe3.request(5)
       probe3.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green 
apple") => e }
       probe3.expectComplete()
@@ -233,7 +233,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "complete when no events" in {
       val src = queries.currentEventsByTag(tag = "pink", offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(2)
       probe.expectComplete()
     }
@@ -242,7 +242,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val c = system.actorOf(TestActor.props("c"))
 
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe = greenSrc.runWith(TestSink.probe[Any])
+      val probe = greenSrc.runWith(TestSink[Any]())
       probe.request(2)
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") 
=> e }
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green 
banana") => e }
@@ -259,7 +259,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "find events from timestamp offset" in {
       val greenSrc1 = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe1 = greenSrc1.runWith(TestSink.probe[Any])
+      val probe1 = greenSrc1.runWith(TestSink[Any]())
       probe1.request(2)
       val appleOffs = probe1
         .expectNextPF {
@@ -280,7 +280,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       appleTimestamp should be <= bananaTimestamp
 
       val greenSrc2 = queries.currentEventsByTag(tag = "green", 
queries.timeBasedUUIDFrom(bananaTimestamp))
-      val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+      val probe2 = greenSrc2.runWith(TestSink[Any]())
       probe2.request(10)
       if (appleTimestamp == bananaTimestamp)
         probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green 
apple") => e }
@@ -291,14 +291,14 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "find events from UUID offset" in {
       val greenSrc1 = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe1 = greenSrc1.runWith(TestSink.probe[Any])
+      val probe1 = greenSrc1.runWith(TestSink[Any]())
       probe1.request(2)
       probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green 
apple") => e }
       val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a 
green banana") => e }.offset
       probe1.cancel()
 
       val greenSrc2 = queries.currentEventsByTag(tag = "green", offs)
-      val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+      val probe2 = greenSrc2.runWith(TestSink[Any]())
       probe2.request(10)
       probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") 
=> e }
       probe2.cancel()
@@ -307,7 +307,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
     "include timestamp in EventEnvelope" in {
       val currentTime = System.currentTimeMillis()
       val greenSrc1 = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe1 = greenSrc1.runWith(TestSink.probe[EventEnvelope])
+      val probe1 = greenSrc1.runWith(TestSink[EventEnvelope]())
       probe1.request(2)
 
       val env1 = probe1.expectNext()
@@ -341,7 +341,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       writeTaggedEvent(t4, pr4, Set("T1-current"), 4, bucketSize)
 
       val src = queries.currentEventsByTag(tag = "T1-current", offset = 
NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(2)
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
@@ -360,7 +360,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "find new events" in {
       val d = system.actorOf(TestActor.props("d"))
-      withProbe(queries.eventsByTag(tag = "black", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "black", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(2)
           probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black 
car") => e }
@@ -381,7 +381,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "find events from timestamp offset" in {
       withProbe(
-        queries.eventsByTag(tag = "green", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+        queries.eventsByTag(tag = "green", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe1 => {
           probe1.request(2)
           val appleOffs = probe1
@@ -401,7 +401,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
           bananaTimestamp should be <= bananaTimestamp
 
           withProbe(
-            queries.eventsByTag(tag = "green", 
queries.timeBasedUUIDFrom(bananaTimestamp)).runWith(TestSink.probe[Any]),
+            queries.eventsByTag(tag = "green", 
queries.timeBasedUUIDFrom(bananaTimestamp)).runWith(TestSink[Any]()),
             probe2 => {
               probe2.request(10)
               if (appleTimestamp == bananaTimestamp)
@@ -416,7 +416,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
     }
 
     "find events from UUID offset " in {
-      withProbe(queries.eventsByTag(tag = "green", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "green", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe1 => {
           probe1.request(2)
           probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green 
apple") => e }
@@ -424,7 +424,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
           probe1.cancel()
 
           val greenSrc2 = queries.eventsByTag(tag = "green", offs)
-          val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+          val probe2 = greenSrc2.runWith(TestSink[Any]())
           probe2.request(10)
           probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green 
leaf") => e }
           probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green 
cucumber") => e }
@@ -435,7 +435,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
     "include timestamp in EventEnvelope" in {
       val currentTime = System.currentTimeMillis()
       val greenSrc1 = queries.eventsByTag(tag = "green", offset = NoOffset)
-      val probe1 = greenSrc1.runWith(TestSink.probe[EventEnvelope])
+      val probe1 = greenSrc1.runWith(TestSink[EventEnvelope]())
       probe1.request(2)
 
       val env1 = probe1.expectNext()
@@ -462,7 +462,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val pr2 = PersistentRepr("e2", 2L, "p1", "", writerUuid = w1)
       writeTaggedEvent(t2, pr2, Set("T1-live"), 2, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T1-live", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T1-live", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(10)
           probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
@@ -490,7 +490,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val pr3 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
       writeTaggedEvent(t3, pr3, Set("T2"), 2, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T2", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T2", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(10)
 
@@ -511,7 +511,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
     "stream many events" in {
       val e = system.actorOf(TestActor.props("e"))
       withProbe(
-        queries.eventsByTag(tag = "yellow", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+        queries.eventsByTag(tag = "yellow", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
 
           for (n <- 1 to 100)
@@ -541,7 +541,7 @@ class EventsByTagSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.config) {
     }
 
     "not complete for empty query" in {
-      val probe = queries.eventsByTag(tag = "empty", offset = 
NoOffset).runWith(TestSink.probe[Any])
+      val probe = queries.eventsByTag(tag = "empty", offset = 
NoOffset).runWith(TestSink[Any]())
       probe.request(2)
       probe.expectNoMessage(waitTime)
       probe.cancel()
@@ -572,7 +572,7 @@ class EventsByTagZeroEventualConsistencyDelaySpec
       expectMsg(s"a green leaf-done")
 
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe = greenSrc.runWith(TestSink.probe[Any])
+      val probe = greenSrc.runWith(TestSink[Any]())
       probe.request(NumberOfBananas + 10L)
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "a green apple") 
=> e }
       (1 to NumberOfBananas).foreach { n =>
@@ -619,7 +619,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal 
= 100ms
       val p2e1 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
       writeTaggedEvent(t2, p2e1, Set("T6"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T6", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T6", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(10)
           probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => 
e }
@@ -646,7 +646,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal 
= 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t2, eventA1, Set("T7"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T7", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T7", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(10)
           probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
@@ -676,7 +676,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal 
= 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t2, eventA1, Set("T8"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T8", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T8", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(10)
           probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
@@ -702,13 +702,13 @@ 
pekko.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t1.plusSeconds(2), eventA1, Set("T9"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T9", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T9", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe1 => {
           probe1.request(10)
           val offs =
             probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => 
e }.offset.asInstanceOf[TimeBasedUUID]
 
-          withProbe(queries.eventsByTag(tag = "T9", offset = 
offs).runWith(TestSink.probe[Any]),
+          withProbe(queries.eventsByTag(tag = "T9", offset = 
offs).runWith(TestSink[Any]()),
             probe2 => {
               probe2.request(10)
 
@@ -737,7 +737,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal 
= 100ms
         writeTaggedEvent(t1.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, 
bucketSize)
       }
 
-      withProbe(queries.eventsByTag(tag = "T10", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T10", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(1000)
           probe.expectNextN(100)
@@ -767,7 +767,7 @@ pekko.persistence.cassandra.events-by-tag.refresh-internal 
= 100ms
       val t1 = LocalDateTime.now(ZoneOffset.UTC)
 
       withProbe(
-        queries.eventsByTag(tag = "T11", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+        queries.eventsByTag(tag = "T11", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(1000)
 
@@ -836,7 +836,7 @@ class EventsByTagStrictBySeqNoEarlyFirstOffsetSpec
 
       // the search for delayed events should start before we get to the 
current timebucket
       // until 0.26/0.51 backtracking was broken and events would be skipped
-      withProbe(queries.eventsByTag(tag = "T11", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T11", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(2000)
           probe.expectNextN(2000)
@@ -873,7 +873,7 @@ class EventsByTagLongRefreshIntervalSpec
     sender.expectNoMessage(200.millis) // try and give time for the tagged 
event to be flushed so the query doesn't need to wait for the refresh interval
 
     val offset: Offset =
-      withProbe(queries.eventsByTag(tag = "animal", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "animal", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
           probe.request(2)
           probe.expectNextPF {
@@ -887,7 +887,7 @@ class EventsByTagLongRefreshIntervalSpec
     // flush interval for tag writes is 0ms but still give some time for the 
tag write to complete
     sender.expectNoMessage(250.millis)
 
-    withProbe(queries.eventsByTag(tag = "animal", offset = 
offset).runWith(TestSink.probe[Any]),
+    withProbe(queries.eventsByTag(tag = "animal", offset = 
offset).runWith(TestSink[Any]()),
       probe => {
         probe.request(2)
         // less than the refresh interval, previously this would evaluate the 
new persistence-id timeout and then not re-evaluate
@@ -927,7 +927,7 @@ class EventsByTagStrictBySeqNoManyInCurrentTimeBucketSpec
       // the search for delayed events should start before we get to the 
current timebucket
       // until 0.26/0.51 backtracking was broken and events would be skipped
       val src = queries.currentEventsByTag(tag = "T12", offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(2000)
       probe.expectNextN(200)
       probe.expectComplete()
@@ -962,7 +962,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends 
AbstractEventsByTagSpec(Even
       }
 
       withProbe(
-        queries.eventsByTag(tag = "T13", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+        queries.eventsByTag(tag = "T13", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
 
           val requested1 = 150L
@@ -1034,7 +1034,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends 
AbstractEventsByTagSpec(Even
           writeTaggedEvent(t2, eventB, Set("T14"), n - 112, bucketSize)
       }
 
-      withProbe(queries.eventsByTag(tag = "T14", offset = 
NoOffset).runWith(TestSink.probe[Any]),
+      withProbe(queries.eventsByTag(tag = "T14", offset = 
NoOffset).runWith(TestSink[Any]()),
         probe => {
 
           val requested1 = 130L
@@ -1073,7 +1073,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends 
AbstractEventsByTagSpec(Even
       }
 
       val src = queries.currentEventsByTag(tag = "T15", offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
 
       (1 to 10).foreach { _ =>
         probe.request(30)
@@ -1114,7 +1114,7 @@ class EventsByTagSpecBackTrackingLongRefreshInterval
       val tagName = "back-track-previous-bucket-no-refresh"
       writeTaggedEvent(PersistentRepr("e1", 1L, "p2", ""), Set(tagName), 1, 
bucketSize)
       val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(10)
       // bring the offset forward with an event for a new persistence id
       probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "e1") => e }
@@ -1160,7 +1160,7 @@ class EventsByTagSpecBackTracking
       writeTaggedEvent(t1.plusMinutes(1), PersistentRepr("e2", 2L, "p1", ""), 
Set("back-track"), 2, bucketSize)
 
       val src = queries.eventsByTag(tag = "back-track", offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(10)
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
@@ -1191,7 +1191,7 @@ class EventsByTagSpecBackTracking
       writeTaggedEvent(t1.plusHours(1), PersistentRepr("e2", 2L, "p1", ""), 
Set(tagName), 2, bucketSize)
 
       val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(10)
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
@@ -1219,7 +1219,7 @@ class EventsByTagSpecBackTracking
     "find new persistence ids that were missed" in {
       val tagName = "back-track-new-persistence-id"
       val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(10)
       writeTaggedEvent(PersistentRepr("e1", 1L, "p1", ""), Set(tagName), 1, 
bucketSize)
       writeTaggedEvent(PersistentRepr("e2", 2L, "p1", ""), Set(tagName), 2, 
bucketSize)
@@ -1240,7 +1240,7 @@ class EventsByTagSpecBackTracking
     "sort delayed events by timeuuid" in {
       val tagName = "back-track-sort-delayed-events"
       val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(10)
       writeTaggedEvent(PersistentRepr("e1", 1L, "p1", ""), Set(tagName), 1, 
bucketSize)
       probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
@@ -1265,7 +1265,7 @@ class EventsByTagSpecBackTracking
     "work for many delayed events for different persistence ids" in {
       val tagName = "back-track-many-persistence-ids"
       val src = queries.eventsByTag(tag = tagName, offset = NoOffset)
-      val probe = src.runWith(TestSink.probe[Any])
+      val probe = src.runWith(TestSink[Any]())
       probe.request(1005)
       // short period is 10m
       val start = today.minusMinutes(9)
@@ -1379,7 +1379,7 @@ class EventsByTagPersistenceIdCleanupSpec extends 
AbstractEventsByTagSpec(Events
         val query =
           queries.eventsByTag("cleanup-tag",
             
TimeBasedUUID(Uuids.startOf(t1.toInstant(ZoneOffset.UTC).toEpochMilli - 1L)))
-        val probe = query.runWith(TestSink.probe[Any])
+        val probe = query.runWith(TestSink[Any]())
         probe.request(10)
         probe.expectNextPF { case e @ EventEnvelope(_, "cleanup", 1L, 
"cleanup-1") => e }
         probe.expectNoMessage(cleanupPeriod + 250.millis)
@@ -1411,14 +1411,14 @@ class EventsByTagDisabledSpec extends 
AbstractEventsByTagSpec(EventsByTagSpec.di
 
     "fail current events by tag queries" in {
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = 
NoOffset)
-      val probe = greenSrc.runWith(TestSink.probe[Any])
+      val probe = greenSrc.runWith(TestSink[Any]())
       probe.request(1)
       probe.expectError().getMessage should include("Events by tag queries are 
disabled")
     }
 
     "fail live events by tag queries" in {
       val greenSrc = queries.eventsByTag(tag = "green", offset = NoOffset)
-      val probe = greenSrc.runWith(TestSink.probe[Any])
+      val probe = greenSrc.runWith(TestSink[Any]())
       probe.request(1)
       probe.expectError().getMessage should include("Events by tag queries are 
disabled")
     }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
index adb38c4..ba266c0 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStageSpec.scala
@@ -94,7 +94,7 @@ class EventsByTagStageSpec
       send(ref, Tagged("e-4", Set("tag-2")))
 
       val stream = queries.currentEventsByTag("tag-2", NoOffset)
-      val sub = stream.toMat(TestSink.probe[EventEnvelope])(Keep.right).run()
+      val sub = stream.toMat(TestSink[EventEnvelope]())(Keep.right).run()
 
       sub.request(2)
       sub.expectNoMessage(50.millis) // eventual consistency delay prevents 
events coming right away
@@ -108,7 +108,7 @@ class EventsByTagStageSpec
     "empty tag completes" in {
       val stream: Source[EventEnvelope, NotUsed] =
         queries.currentEventsByTag("bogus", NoOffset)
-      val sub = stream.toMat(TestSink.probe[EventEnvelope])(Keep.right).run()
+      val sub = stream.toMat(TestSink[EventEnvelope]())(Keep.right).run()
       sub.request(1)
       sub.expectComplete()
     }
@@ -120,7 +120,7 @@ class EventsByTagStageSpec
       }
 
       val stream = queries.currentEventsByTag("paged", NoOffset)
-      val sub = stream.runWith(TestSink.probe[EventEnvelope])
+      val sub = stream.runWith(TestSink[EventEnvelope]())
 
       sub.request(fetchSize + 1L)
       (1L to (fetchSize + 1)).foreach { i =>
@@ -142,7 +142,7 @@ class EventsByTagStageSpec
         bucketSize)
 
       val tagStream = queries.currentEventsByTag("CurrentPreviousBuckets", 
NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(2)
       sub.expectNextPF { case EventEnvelope(_, "p-3", 1, "e-1") => }
@@ -180,7 +180,7 @@ class EventsByTagStageSpec
         bucketSize)
 
       val tagStream = 
queries.currentEventsByTag("CurrentPreviousMultipleBuckets", NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(2)
       sub.expectNextPF { case EventEnvelope(_, "l-4", 1, "e-1") => }
@@ -205,7 +205,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(PersistentRepr("p1e4", 4, "p-1"), Set(tag), 4, 
times(4), bucketSize)
 
       val tagStream = queries.currentEventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(4)
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -236,7 +236,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(PersistentRepr("p1e4", 4, "p-1"), Set(tag), 4, 
times(4), bucketSize)
 
       val tagStream: Source[EventEnvelope, NotUsed] = 
queries.currentEventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(4)
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -258,7 +258,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(thisBucket, PersistentRepr("p1e4", 4, "p-1"), Set(tag), 
4, bucketSize)
 
       val tagStream = queries.currentEventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
       sub.request(5)
 
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -287,7 +287,7 @@ class EventsByTagStageSpec
       val tagStream = queries.currentEventsByTag(
         tag,
         
queries.timeBasedUUIDFrom(twoBucketsAgo.minusMinutes(1).toInstant(ZoneOffset.UTC).toEpochMilli))
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(3)
       val f0: PartialFunction[Any, Any] = { case EventEnvelope(_, "p-1", 10, 
"p1e10") => }
@@ -309,7 +309,7 @@ class EventsByTagStageSpec
       val stream: Source[EventEnvelope, NotUsed] =
         queries.eventsByTag("bogus", NoOffset)
 
-      val sub = stream.toMat(TestSink.probe[EventEnvelope])(Keep.right).run()
+      val sub = stream.toMat(TestSink[EventEnvelope]())(Keep.right).run()
       sub.request(1)
       sub.expectNoMessage(waitTime)
     }
@@ -319,7 +319,7 @@ class EventsByTagStageSpec
       send(ref, Tagged("e-1", Set("tag-3")))
 
       val blackSrc = queries.eventsByTag(tag = "tag-3", offset = NoOffset)
-      val probe = blackSrc.runWith(TestSink.probe[EventEnvelope])
+      val probe = blackSrc.runWith(TestSink[EventEnvelope]())
       probe.request(2)
       probe.expectNextPF { case EventEnvelope(_, "b", 1L, "e-1") => }
       probe.expectNoMessage(waitTime)
@@ -356,7 +356,7 @@ class EventsByTagStageSpec
         bucketSize)
 
       val tagStream = queries.eventsByTag("LivePreviousBuckets", NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(2)
       sub.expectNextPF { case EventEnvelope(_, "l-4", 1, "e-1") => }
@@ -394,7 +394,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(PersistentRepr("p1e4", 4, "p-1"), Set(tag), 4, 
times(4), bucketSize)
 
       val tagStream = queries.eventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(4)
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -423,7 +423,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(PersistentRepr("p1e6", 6, "p-1"), Set(tag), 6, 
times(6), bucketSize)
 
       val tagStream = queries.eventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(4)
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -457,7 +457,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(PersistentRepr("p2e3", 3, "p-2"), Set(tag), 3, 
times(3), bucketSize)
 
       val tagStream = queries.eventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
       sub.request(10)
 
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -487,7 +487,7 @@ class EventsByTagStageSpec
 
       writeTaggedEvent(nowTime.plusSeconds(1), PersistentRepr("e-3", 3, 
"p-1"), Set(tag), 3, bucketSize)
 
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
       sub.request(5)
       sub.expectNoMessage(waitTime)
 
@@ -510,7 +510,7 @@ class EventsByTagStageSpec
       writeTaggedEvent(thisBucket, PersistentRepr("p1e4", 4, "p-1"), Set(tag), 
4, bucketSize)
 
       val tagStream = queries.eventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
       sub.request(5)
 
       sub.expectNextPF { case EventEnvelope(_, "p-1", 1, "p1e1") => }
@@ -538,7 +538,7 @@ class EventsByTagStageSpec
         queries.eventsByTag(
           tag,
           
queries.timeBasedUUIDFrom(nowTime.minusSeconds(1).toInstant(ZoneOffset.UTC).toEpochMilli))
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
 
       sub.request(4)
       sub.expectNoMessage(100.millis)
@@ -558,7 +558,7 @@ class EventsByTagStageSpec
       val nowTime = LocalDateTime.now(ZoneOffset.UTC)
       writeTaggedEvent(nowTime.plusSeconds(1), PersistentRepr("p1e1", 1, 
"p1"), Set(tag), 1, bucketSize)
       val tagStream = queries.eventsByTag(tag, NoOffset)
-      val sub = tagStream.runWith(TestSink.probe[EventEnvelope])
+      val sub = tagStream.runWith(TestSink[EventEnvelope]())
       sub.request(2)
       sub.expectNextPF { case EventEnvelope(_, "p1", 1, "p1e1") => }
       writeTaggedEvent(nowTime.plusSeconds(2), PersistentRepr("p1e10000", 
10000, "p1"), Set(tag), 10000, bucketSize)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
index f8ca36f..f0822bd 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala
@@ -57,7 +57,7 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
       expectMsg("a-1-done")
 
       val src = javaQueries.eventsByPersistenceId("a", 0L, Long.MaxValue)
-      
src.asScala.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").cancel()
+      
src.asScala.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").cancel()
     }
 
     "start current eventsByPersistenceId query" in {
@@ -66,14 +66,14 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
       expectMsg("b-1-done")
 
       val src = javaQueries.currentEventsByPersistenceId("b", 0L, 
Long.MaxValue)
-      
src.asScala.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("b").expectComplete()
+      
src.asScala.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("b").expectComplete()
     }
 
     "start eventsByTag query" in {
       val src = javaQueries.eventsByTag("a", Offset.noOffset)
       src.asScala
         .map(_.persistenceId)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(10)
         .expectNext("a")
         .expectNoMessage(100.millis)
@@ -82,7 +82,7 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
 
     "start current eventsByTag query" in {
       val src = javaQueries.currentEventsByTag("a", Offset.noOffset)
-      
src.asScala.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").expectComplete()
+      
src.asScala.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").expectComplete()
     }
   }
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
index 42780df..44c0d17 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala
@@ -58,7 +58,7 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
       expectMsg("a-1-done")
 
       val src = queries.eventsByPersistenceId("a", 0L, Long.MaxValue)
-      
src.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").cancel()
+      
src.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").cancel()
     }
 
     "start current eventsByPersistenceId query" in {
@@ -67,7 +67,7 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
       expectMsg("b-1-done")
 
       val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
-      
src.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("b").expectComplete()
+      
src.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("b").expectComplete()
     }
 
     // these tests rely on events written in previous tests
@@ -75,7 +75,7 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
       val src = queries.eventsByTag("a", NoOffset)
       src
         .map(_.persistenceId)
-        .runWith(TestSink.probe[Any])
+        .runWith(TestSink[Any]())
         .request(10)
         .expectNext("a")
         .expectNoMessage(100.millis)
@@ -84,7 +84,7 @@ class CassandraReadJournalSpec extends 
CassandraSpec(CassandraReadJournalSpec.co
 
     "start current eventsByTag query" in {
       val src = queries.currentEventsByTag("a", NoOffset)
-      
src.map(_.persistenceId).runWith(TestSink.probe[Any]).request(10).expectNext("a").expectComplete()
+      
src.map(_.persistenceId).runWith(TestSink[Any]()).request(10).expectNext("a").expectComplete()
     }
 
     "insert Cassandra metrics to Cassandra Metrics Registry" in {
diff --git a/project/PekkoConnectorsDependency.scala 
b/project/PekkoConnectorsDependency.scala
index 0d6d898..6ba49c1 100644
--- a/project/PekkoConnectorsDependency.scala
+++ b/project/PekkoConnectorsDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoConnectorsDependency extends PekkoDependency {
   override val checkProject: String = "pekko-connectors-cassandra"
   override val module: Option[String] = Some("connectors")
-  override val currentVersion: String = "1.1.0"
+  override val currentVersion: String = "1.2.0"
 }
diff --git a/project/PekkoCoreDependency.scala 
b/project/PekkoCoreDependency.scala
index 992921f..61d5bd7 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoCoreDependency extends PekkoDependency {
   override val checkProject: String = "pekko-cluster-sharding-typed"
   override val module: Option[String] = None
-  override val currentVersion: String = "1.1.5"
+  override val currentVersion: String = "1.3.0"
 }
diff --git a/project/PekkoManagementDependency.scala 
b/project/PekkoManagementDependency.scala
index 342018d..ce712ec 100644
--- a/project/PekkoManagementDependency.scala
+++ b/project/PekkoManagementDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoManagementDependency extends PekkoDependency {
   override val checkProject: String = "pekko-discovery-aws-api-async"
   override val module: Option[String] = Some("management")
-  override val currentVersion: String = "1.1.0-M1"
+  override val currentVersion: String = "1.1.1"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to