This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 7aa685f92 use pekko 1.3.0 (#1290)
7aa685f92 is described below

commit 7aa685f92a2cf06b8a17d0036c900222b3725e0d
Author: PJ Fanning <[email protected]>
AuthorDate: Thu Nov 20 09:36:11 2025 +0100

    use pekko 1.3.0 (#1290)
    
    * use pekko 1.3.0
    
    * try to fix TestSource/TestSink probe issues
    
    * more changes
    
    * compile issues
    
    * compile issues
    
    * compile issues
    
    * Update MqttFrameStageSpec.scala
---
 amqp/src/test/java/docs/javadsl/AmqpDocsTest.java  |  2 +-
 .../amqp/javadsl/AmqpConnectorsTest.java           |  2 +-
 .../connectors/amqp/javadsl/AmqpFlowTest.java      |  6 +++---
 .../test/scala/docs/scaladsl/AmqpDocsSpec.scala    |  2 +-
 .../amqp/scaladsl/AmqpConnectorsSpec.scala         |  6 +++---
 .../connectors/amqp/scaladsl/AmqpFlowSpec.scala    | 23 +++++++++++-----------
 .../docs/scaladsl/AvroParquetSourceSpec.scala      |  4 ++--
 .../eventbridge/EventBridgePublishMockSpec.scala   | 10 +++++-----
 .../scala/docs/scaladsl/AwsLambdaFlowSpec.scala    |  4 ++--
 .../cassandra/javadsl/CassandraSessionSpec.scala   |  4 ++--
 .../test/scala/docs/scaladsl/CsvParsingSpec.scala  |  5 ++---
 .../impl/ElasticsearchSimpleFlowStageTest.scala    | 10 ++++------
 .../impl/ElasticsearchSourcStageTest.scala         |  2 +-
 .../docs/scaladsl/FileTailSourceExtrasSpec.scala   |  4 ++--
 .../scala/docs/scaladsl/LogRotatorSinkSpec.scala   | 12 +++++------
 .../file/impl/archive/ZipArchiveFlowTest.scala     |  5 ++---
 .../stream/connectors/ftp/CommonFtpStageTest.java  |  4 ++--
 .../stream/connectors/ftp/CommonFtpStageSpec.scala | 22 ++++++++++-----------
 .../test/scala/docs/scaladsl/IntegrationSpec.scala |  2 +-
 .../connectors/google/util/AnnotateLastSpec.scala  | 10 +++++-----
 .../scaladsl/JmsBufferedAckConnectorsSpec.scala    |  4 ++--
 .../jakartams/scaladsl/JmsAckConnectorsSpec.scala  |  4 ++--
 .../scaladsl/JmsBufferedAckConnectorsSpec.scala    |  4 ++--
 .../jms/scaladsl/JmsAckConnectorsSpec.scala        |  4 ++--
 .../connectors/kinesis/KinesisFlowSpec.scala       | 10 ++++------
 .../kinesis/KinesisSchedulerSourceSpec.scala       |  7 +++----
 .../connectors/kinesis/KinesisSourceSpec.scala     | 12 +++++------
 .../kinesisfirehose/KinesisFirehoseFlowSpec.scala  |  5 ++---
 .../mqtt/streaming/impl/MqttFrameStageSpec.scala   | 16 +++++++--------
 .../src/test/java/docs/javadsl/MqttSourceTest.java |  2 +-
 .../test/scala/docs/scaladsl/MqttSourceSpec.scala  |  8 ++++----
 .../src/test/java/docs/javadsl/MqttSourceTest.java |  2 +-
 .../test/scala/docs/scaladsl/MqttSourceSpec.scala  |  8 ++++----
 project/PekkoCoreDependency.scala                  |  2 +-
 .../connectors/sns/SnsPublishMockingSpec.scala     | 14 ++++++-------
 .../sqs/scaladsl/SqsPublishSinkSpec.scala          | 18 ++++++++---------
 .../sqs/scaladsl/SqsSourceMockSpec.scala           |  6 +++---
 .../text/scaladsl/CharsetCodingFlowsSpec.scala     |  8 +++-----
 udp/src/test/java/docs/javadsl/UdpTest.java        |  8 ++++----
 udp/src/test/scala/docs/scaladsl/UdpSpec.scala     | 20 ++++++++-----------
 40 files changed, 141 insertions(+), 160 deletions(-)

diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java 
b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
index 66a5aa727..06bb08309 100644
--- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
+++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
@@ -145,7 +145,7 @@ public class AmqpDocsTest {
         Source.from(input)
             .map(ByteString::fromString)
             .viaMat(ampqRpcFlow, Keep.right())
-            .toMat(TestSink.probe(system), Keep.both())
+            .toMat(TestSink.create(system), Keep.both())
             .run(system);
     // #create-rpc-flow
     result.first().toCompletableFuture().get(3, TimeUnit.SECONDS);
diff --git 
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
 
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
index 225c9178d..e48526867 100644
--- 
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
+++ 
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpConnectorsTest.java
@@ -149,7 +149,7 @@ public class AmqpConnectorsTest {
             .map(WriteMessage::create)
             .viaMat(ampqRpcFlow, Keep.right())
             .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm.message()))
-            .toMat(TestSink.probe(system), Keep.both())
+            .toMat(TestSink.create(system), Keep.both())
             .run(system);
 
     result.first().toCompletableFuture().get(5, TimeUnit.SECONDS);
diff --git 
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
 
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
index 06f5e435f..bb0f9866c 100644
--- 
a/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
+++ 
b/amqp/src/test/java/org/apache/pekko/stream/connectors/amqp/javadsl/AmqpFlowTest.java
@@ -108,7 +108,7 @@ public class AmqpFlowTest {
         Source.from(input)
             .map(s -> WriteMessage.create(ByteString.fromString(s)))
             .via(flow)
-            .toMat(TestSink.probe(system), Keep.right())
+            .toMat(TestSink.create(system), Keep.right())
             .run(system);
 
     result
@@ -142,7 +142,7 @@ public class AmqpFlowTest {
             .map(s -> WriteMessage.create(ByteString.fromString(s)))
             .via(flowWithContext)
             .asSource()
-            .toMat(TestSink.probe(system), Keep.right())
+            .toMat(TestSink.create(system), Keep.right())
             .run(system);
 
     result
@@ -165,7 +165,7 @@ public class AmqpFlowTest {
         Source.from(input)
             .map(s -> 
Pair.create(WriteMessage.create(ByteString.fromString(s)), s))
             .via(flow)
-            .toMat(TestSink.probe(system), Keep.right())
+            .toMat(TestSink.create(system), Keep.right())
             .run(system);
 
     result
diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala 
b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
index dab24b6a3..d0956f465 100644
--- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
+++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
@@ -103,7 +103,7 @@ class AmqpDocsSpec extends AmqpSpec {
       val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString]) 
= Source(input)
         .map(s => ByteString(s))
         .viaMat(amqpRpcFlow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
       // #create-rpc-flow
       rpcQueueF.futureValue
diff --git 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
index dad8f3bb7..bb038874a 100644
--- 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
+++ 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
@@ -103,7 +103,7 @@ class AmqpConnectorsSpec extends AmqpSpec with 
ScalaCheckDrivenPropertyChecks {
 
         val input = Vector("one", "two", "three", "four", "five")
         val (rpcQueueF, probe) =
-          Source(input).map(s => 
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
+          Source(input).map(s => 
ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink())(Keep.both).run()
         rpcQueueF.futureValue
 
         val amqpSink = AmqpSink.replyTo(
@@ -134,7 +134,7 @@ class AmqpConnectorsSpec extends AmqpSpec with 
ScalaCheckDrivenPropertyChecks {
       Source
         .empty[ByteString]
         .via(AmqpRpcFlow.simple(AmqpWriteSettings(connectionProvider)))
-        .runWith(TestSink.probe)
+        .runWith(TestSink())
         .ensureSubscription()
         .expectComplete()
 
@@ -358,7 +358,7 @@ class AmqpConnectorsSpec extends AmqpSpec with 
ScalaCheckDrivenPropertyChecks {
             .map(bytes => WriteMessage(bytes))
             .viaMat(amqpRpcFlow)(Keep.right)
             .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
-            .toMat(TestSink.probe)(Keep.both)
+            .toMat(TestSink())(Keep.both)
             .run()
         rpcQueueF.futureValue
 
diff --git 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
index 421da76cf..d45ec2f60 100644
--- 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
+++ 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpFlowSpec.scala
@@ -160,7 +160,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
           .map(s => WriteMessage(ByteString(s)))
           .viaMat(mockedFlowWithContextAndConfirm)(Keep.right)
           .asSource
-          .toMat(TestSink.probe)(Keep.both)
+          .toMat(TestSink())(Keep.both)
           .run()
 
       probe.request(input.size)
@@ -242,7 +242,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
           .map(s => WriteMessage(ByteString(s)))
           .viaMat(mockedUnorderedFlowWithPassThrough)(Keep.right)
           .asSource
-          .toMat(TestSink.probe)(Keep.both)
+          .toMat(TestSink())(Keep.both)
           .run()
 
       probe.request(input.size)
@@ -277,7 +277,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
       Source(input)
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     val messages = probe.request(input.size).expectNextN(input.size)
@@ -295,7 +295,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
         .asSourceWithContext(identity)
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     val messages = probe.request(input.size).expectNextN(input.size)
@@ -312,7 +312,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
       Source(input)
         .map(s => (WriteMessage(ByteString(s)), s))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     val messages = probe.request(input.size).expectNextN(input.size)
@@ -350,7 +350,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
       Source(input)
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     probe.request(input.size)
@@ -378,7 +378,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
       Source(input)
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     val messages = probe.request(input.size).expectNextN(input.size)
@@ -397,7 +397,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
       Source(input)
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     probe.request(input.size)
@@ -435,7 +435,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
       Source(1 to sourceElements)
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.right)
-        .toMat(TestSink.probe)(Keep.right)
+        .toMat(TestSink())(Keep.right)
         .run()
 
     probe.request(sourceElements)
@@ -451,11 +451,10 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with 
BeforeAndAfterEach {
     val input = Vector("one", "two")
 
     val (sourceProbe, sinkProbe) =
-      TestSource
-        .probe[String]
+      TestSource[String]()
         .map(s => WriteMessage(ByteString(s)))
         .viaMat(flow)(Keep.left)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     sinkProbe.request(input.size)
diff --git 
a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala 
b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
index 2fa88db1e..c385ec8e4 100644
--- a/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
+++ b/avroparquet/src/test/scala/docs/scaladsl/AvroParquetSourceSpec.scala
@@ -56,7 +56,7 @@ class AvroParquetSourceSpec
       // #init-source
       val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
       // #init-source
-      val sink = source.runWith(TestSink.probe)
+      val sink = source.runWith(TestSink())
 
       // then
       val result: Seq[GenericRecord] = sink.toStrict(3.seconds)
@@ -80,7 +80,7 @@ class AvroParquetSourceSpec
       // #init-source
       val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
       // #init-source
-      val sink = source.runWith(TestSink.probe)
+      val sink = source.runWith(TestSink())
 
       // then
       val result: Seq[GenericRecord] = sink.toStrict(3.seconds)
diff --git 
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
 
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
index 35cf55e9e..cf3f89f2c 100644
--- 
a/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
+++ 
b/aws-event-bridge/src/test/scala/org/apache/pekko/stream/connectors/aws/eventbridge/EventBridgePublishMockSpec.scala
@@ -56,7 +56,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with 
DefaultTestContext wit
     
when(eventBridgeClient.putEvents(putRequest)).thenReturn(CompletableFuture.completedFuture(putResult))
 
     val (probe, future) =
-      
TestSource.probe[PutEventsRequestEntry].via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[PutEventsRequestEntry]().via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
 
     probe.sendNext(putRequestEntry).sendComplete()
 
@@ -71,7 +71,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with 
DefaultTestContext wit
     
when(eventBridgeClient.putEvents(any[PutEventsRequest]())).thenReturn(CompletableFuture.completedFuture(putResult))
 
     val (probe, future) =
-      
TestSource.probe[PutEventsRequestEntry].via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[PutEventsRequestEntry]().via(EventBridgePublisher.flow()).toMat(Sink.seq)(Keep.both).run()
 
     probe
       .sendNext(entryDetail("eb-message-1"))
@@ -97,7 +97,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with 
DefaultTestContext wit
     
when(eventBridgeClient.putEvents(any[PutEventsRequest]())).thenReturn(CompletableFuture.completedFuture(putResult))
 
     val (probe, future) =
-      
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[Seq[PutEventsRequestEntry]]().via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
     probe
       .sendNext(Seq(entryDetail("eb-message-1"), entryDetail("eb-message-2"), 
entryDetail("eb-message-3")))
       .sendComplete()
@@ -118,7 +118,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with 
DefaultTestContext wit
     when(eventBridgeClient.putEvents(meq(publishRequest))).thenReturn(promise)
 
     val (probe, future) =
-      
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[Seq[PutEventsRequestEntry]]().via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
     probe.sendNext(Seq(entryDetail("eb-message"))).sendComplete()
 
     a[RuntimeException] should be thrownBy {
@@ -132,7 +132,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with 
DefaultTestContext wit
     case class MyCustomException(message: String) extends Exception(message)
 
     val (probe, future) =
-      
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[Seq[PutEventsRequestEntry]]().via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
     probe.sendError(MyCustomException("upstream failure"))
 
     a[MyCustomException] should be thrownBy {
diff --git a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala 
b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
index 4cad3b954..0b3825596 100644
--- a/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
+++ b/awslambda/src/test/scala/docs/scaladsl/AwsLambdaFlowSpec.scala
@@ -77,7 +77,7 @@ class AwsLambdaFlowSpec
           CompletableFuture.completedFuture(invokeResponse)
       })
 
-      val (probe, future) = 
TestSource.probe[InvokeRequest].via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
+      val (probe, future) = 
TestSource[InvokeRequest]().via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
       probe.sendNext(invokeRequest)
       probe.sendComplete()
 
@@ -98,7 +98,7 @@ class AwsLambdaFlowSpec
         }
       })
 
-      val (probe, future) = 
TestSource.probe[InvokeRequest].via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
+      val (probe, future) = 
TestSource[InvokeRequest]().via(lambdaFlow).toMat(Sink.seq)(Keep.both).run()
 
       probe.sendNext(invokeFailureRequest)
       probe.sendComplete()
diff --git 
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
 
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
index c47484118..646647c39 100644
--- 
a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
+++ 
b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala
@@ -112,7 +112,7 @@ final class CassandraSessionSpec extends 
CassandraSpecBase(ActorSystem("Cassandr
       val stmt = await(session.prepare(s"SELECT count FROM $dataTable WHERE 
partition = ?"))
       val bound = stmt.bind("A")
       val rows = session.select(bound).asScala
-      val probe = rows.map(_.getLong("count")).runWith(TestSink.probe[Long])
+      val probe = rows.map(_.getLong("count")).runWith(TestSink[Long]())
       probe.within(10.seconds) {
         probe.request(10).expectNextUnordered(1L, 2L, 3L, 4L).expectComplete()
       }
@@ -120,7 +120,7 @@ final class CassandraSessionSpec extends 
CassandraSpecBase(ActorSystem("Cassandr
 
     "select and bind as Source" in {
       val rows = session.select(s"SELECT count FROM $dataTable WHERE partition 
= ?", "B").asScala
-      val probe = rows.map(_.getLong("count")).runWith(TestSink.probe[Long])
+      val probe = rows.map(_.getLong("count")).runWith(TestSink[Long]())
       probe.within(10.seconds) {
         probe.request(10).expectNextUnordered(5L, 6L).expectComplete()
       }
diff --git a/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala 
b/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
index 526dda4c1..8894b0071 100644
--- a/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
+++ b/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
@@ -129,11 +129,10 @@ class CsvParsingSpec extends CsvSpec {
     }
 
     "emit completion even without new line at end" in assertAllStagesStopped {
-      val (source, sink) = TestSource
-        .probe[ByteString]
+      val (source, sink) = TestSource[ByteString]()
         .via(CsvParsing.lineScanner())
         .map(_.map(_.utf8String))
-        .toMat(TestSink.probe[List[String]])(Keep.both)
+        .toMat(TestSink[List[String]]())(Keep.both)
         .run()
       source.sendNext(ByteString("eins,zwei,drei\nuno,dos,tres\n1,2,3"))
       sink.request(3)
diff --git 
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
 
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
index fe5cb0f29..49dbdacc7 100644
--- 
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
+++ 
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala
@@ -52,14 +52,13 @@ class ElasticsearchSimpleFlowStageTest
     "stream ends" should {
       "emit element only when downstream requests" in {
         val (upstream, downstream) =
-          TestSource
-            .probe[(immutable.Seq[WriteMessage[String, NotUsed]], 
immutable.Seq[WriteResult[String, NotUsed]])]
+          TestSource[(immutable.Seq[WriteMessage[String, NotUsed]], 
immutable.Seq[WriteResult[String, NotUsed]])]()
             .via(
               new impl.ElasticsearchSimpleFlowStage[String, NotUsed](
                 ElasticsearchParams.V7("es-simple-flow-index"),
                 settings,
                 writer))
-            .toMat(TestSink.probe)(Keep.both)
+            .toMat(TestSink())(Keep.both)
             .run()
 
         upstream.sendNext(dummyMessages)
@@ -77,14 +76,13 @@ class ElasticsearchSimpleFlowStageTest
     "client cannot connect to ES" should {
       "stop the stream" in {
         val (upstream, downstream) =
-          TestSource
-            .probe[(immutable.Seq[WriteMessage[String, NotUsed]], 
immutable.Seq[WriteResult[String, NotUsed]])]
+          TestSource[(immutable.Seq[WriteMessage[String, NotUsed]], 
immutable.Seq[WriteResult[String, NotUsed]])]()
             .via(
               new impl.ElasticsearchSimpleFlowStage[String, NotUsed](
                 ElasticsearchParams.V7("es-simple-flow-index"),
                 
settings.withConnection(ElasticsearchConnectionSettings("http://wololo:9202";)),
                 writer))
-            .toMat(TestSink.probe)(Keep.both)
+            .toMat(TestSink())(Keep.both)
             .run()
 
         upstream.sendNext(dummyMessages)
diff --git 
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
 
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
index c212a735b..d39464f47 100644
--- 
a/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
+++ 
b/elasticsearch/src/test/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourcStageTest.scala
@@ -46,7 +46,7 @@ class ElasticsearchSourcStageTest
               Map("query" -> """{ "match_all":{}}"""),
               
ElasticsearchSourceSettings(ElasticsearchConnectionSettings("http://wololo:9202";)),
               (json: String) => ScrollResponse(Some(json), None)))
-          .toMat(TestSink.probe)(Keep.right)
+          .toMat(TestSink())(Keep.right)
           .run()
 
         downstream.request(1)
diff --git a/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala 
b/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
index 92096c4f6..a2a6878ce 100644
--- a/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/FileTailSourceExtrasSpec.scala
@@ -69,7 +69,7 @@ class FileTailSourceExtrasSpec
 
       // #shutdown-on-delete
 
-      val probe = stream.toMat(TestSink.probe)(Keep.right).run()
+      val probe = stream.toMat(TestSink())(Keep.right).run()
 
       val result = probe.requestNext()
       result shouldEqual "a"
@@ -96,7 +96,7 @@ class FileTailSourceExtrasSpec
 
       // #shutdown-on-idle-timeout
 
-      val probe = stream.toMat(TestSink.probe)(Keep.right).run()
+      val probe = stream.toMat(TestSink())(Keep.right).run()
 
       val result = probe.requestNext()
       result shouldEqual "a"
diff --git a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala 
b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
index 433acb853..22fbf6a52 100644
--- a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
@@ -296,7 +296,7 @@ class LogRotatorSinkSpec
     "upstream fail before first file creation" in assertAllStagesStopped {
       val (triggerFunctionCreator, files) = fileLengthTriggerCreator()
       val (probe, completion) =
-        
TestSource.probe[ByteString].toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
+        
TestSource[ByteString]().toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
 
       val ex = new Exception("my-exception")
       probe.sendError(ex)
@@ -307,7 +307,7 @@ class LogRotatorSinkSpec
     "upstream fail after first file creation" in assertAllStagesStopped {
       val (triggerFunctionCreator, files) = fileLengthTriggerCreator()
       val (probe, completion) =
-        
TestSource.probe[ByteString].toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
+        
TestSource[ByteString]().toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
 
       val ex = new Exception("my-exception")
       probe.sendNext(ByteString("test"))
@@ -326,7 +326,7 @@ class LogRotatorSinkSpec
           }
       }
       val (probe, completion) =
-        
TestSource.probe[ByteString].toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
+        
TestSource[ByteString]().toMat(LogRotatorSink(triggerFunctionCreator))(Keep.both).run()
       probe.sendNext(ByteString("test"))
       the[Exception] thrownBy Await.result(completion, 3.seconds) shouldBe ex
     }
@@ -340,8 +340,7 @@ class LogRotatorSinkSpec
           }
       }
       val (probe, completion) =
-        TestSource
-          .probe[ByteString]
+        TestSource[ByteString]()
           .toMat(LogRotatorSink(triggerFunctionCreator, 
Set(StandardOpenOption.READ)))(Keep.both)
           .run()
       probe.sendNext(ByteString("test"))
@@ -370,8 +369,7 @@ class LogRotatorSinkSpec
         }
     }
     val (probe, completion) =
-      TestSource
-        .probe[ByteString]
+      TestSource[ByteString]()
         .toMat(
           LogRotatorSink.withSinkFactory(
             triggerGeneratorCreator = triggerFunctionCreator,
diff --git 
a/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
 
b/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
index e681001db..f2b8e5e4b 100644
--- 
a/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
+++ 
b/file/src/test/scala/org/apache/pekko/stream/connectors/file/impl/archive/ZipArchiveFlowTest.scala
@@ -33,10 +33,9 @@ class ZipArchiveFlowTest
     "stream ends" should {
       "emit element only when downstream requests" in {
         val (upstream, downstream) =
-          TestSource
-            .probe[ByteString]
+          TestSource[ByteString]()
             .via(new ZipArchiveFlow())
-            .toMat(TestSink.probe)(Keep.both)
+            .toMat(TestSink())(Keep.both)
             .run()
 
         
upstream.sendNext(FileByteStringSeparators.createStartingByteString("test"))
diff --git 
a/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
 
b/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
index 331e523be..e122a0d77 100644
--- 
a/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
+++ 
b/ftp/src/test/java/org/apache/pekko/stream/connectors/ftp/CommonFtpStageTest.java
@@ -68,7 +68,7 @@ interface CommonFtpStageTest extends BaseSupport, 
PekkoSupport {
     Source<FtpFile, NotUsed> source = getBrowserSource(basePath);
 
     Pair<NotUsed, TestSubscriber.Probe<FtpFile>> pairResult =
-        source.toMat(TestSink.probe(system), Keep.both()).run(system);
+        source.toMat(TestSink.create(system), Keep.both()).run(system);
     TestSubscriber.Probe<FtpFile> probe = pairResult.second();
     probe.request(demand).expectNextN(numFiles);
     probe.expectComplete();
@@ -82,7 +82,7 @@ interface CommonFtpStageTest extends BaseSupport, 
PekkoSupport {
 
     Source<ByteString, CompletionStage<IOResult>> source = 
getIOSource(fileName);
     Pair<CompletionStage<IOResult>, TestSubscriber.Probe<ByteString>> 
pairResult =
-        source.toMat(TestSink.probe(system), Keep.both()).run(system);
+        source.toMat(TestSink.create(system), Keep.both()).run(system);
     TestSubscriber.Probe<ByteString> probe = pairResult.second();
     probe.request(100).expectNextOrComplete();
 
diff --git 
a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
 
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
index 177503d8c..cb34d5561 100644
--- 
a/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
+++ 
b/ftp/src/test/scala/org/apache/pekko/stream/connectors/ftp/CommonFtpStageSpec.scala
@@ -107,7 +107,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       val basePath = ""
       generateFiles(30, 10, basePath)
       val probe =
-        listFiles(basePath).toMat(TestSink.probe)(Keep.right).run()
+        listFiles(basePath).toMat(TestSink())(Keep.right).run()
       probe.request(40).expectNextN(30)
       probe.expectComplete()
     }
@@ -116,7 +116,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       val basePath = "/foo"
       generateFiles(30, 10, basePath)
       val probe =
-        listFiles(basePath).toMat(TestSink.probe)(Keep.right).run()
+        listFiles(basePath).toMat(TestSink())(Keep.right).run()
       probe.request(40).expectNextN(30)
       probe.expectComplete()
     }
@@ -125,7 +125,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       val basePath = "/foo"
       generateFiles(30, 10, basePath)
       val probe =
-        listFilesWithFilter(basePath, f => 
false).toMat(TestSink.probe)(Keep.right).run()
+        listFilesWithFilter(basePath, f => 
false).toMat(TestSink())(Keep.right).run()
       probe.request(40).expectNextN(12) // 9 files, 3 directories
       probe.expectComplete()
 
@@ -136,7 +136,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       generateFiles(30, 10, basePath)
       val probe =
         listFilesWithFilter(basePath, f => f.name.contains("1"))
-          .toMat(TestSink.probe)(Keep.right)
+          .toMat(TestSink())(Keep.right)
           .run()
       probe.request(40).expectNextN(21) // 9 files in root, 2 directories, 10 
files in dir_1
       probe.expectComplete()
@@ -146,7 +146,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
     "list all files in sparse directory tree" in assertAllStagesStopped {
       putFileOnFtp("foo/bar/baz/foobar/sample")
       val probe =
-        listFiles("/").toMat(TestSink.probe)(Keep.right).run()
+        listFiles("/").toMat(TestSink())(Keep.right).run()
       probe.request(2).expectNextN(1)
       probe.expectComplete()
     }
@@ -155,7 +155,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       putFileOnFtp("foo/bar/baz/foobar/sample")
       val probe =
         listFilesWithFilter("/", _ => true, emitTraversedDirectories = true)
-          .toMat(TestSink.probe)(Keep.right)
+          .toMat(TestSink())(Keep.right)
           .run()
       probe.request(10).expectNextN(5) // foo, bar, baz, foobar, and sample_1 
= 5 files
       probe.expectComplete()
@@ -189,7 +189,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       val fileName = "sample_io_" + Instant.now().getNano
       putFileOnFtp(fileName)
       val (result, probe) =
-        retrieveFromPath(s"/$fileName").toMat(TestSink.probe)(Keep.both).run()
+        retrieveFromPath(s"/$fileName").toMat(TestSink())(Keep.both).run()
       probe.request(100).expectNextOrComplete()
 
       val expectedNumOfBytes = getDefaultContent.getBytes().length
@@ -201,7 +201,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       val offset = 10L
       putFileOnFtp(fileName)
       val (result, probe) =
-        retrieveFromPathWithOffset(s"/$fileName", 
offset).toMat(TestSink.probe)(Keep.both).run()
+        retrieveFromPathWithOffset(s"/$fileName", 
offset).toMat(TestSink())(Keep.both).run()
       probe.request(100).expectNextOrComplete()
 
       val expectedNumOfBytes = getDefaultContent.getBytes().length - offset
@@ -213,7 +213,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       val fileContents = new Array[Byte](2000020)
       Random.nextBytes(fileContents)
       putFileOnFtpWithContents(fileName, fileContents)
-      val (result, probe) = 
retrieveFromPath(s"/$fileName").toMat(TestSink.probe)(Keep.both).run()
+      val (result, probe) = 
retrieveFromPath(s"/$fileName").toMat(TestSink())(Keep.both).run()
       probe.request(1000).expectNextOrComplete()
 
       val expectedNumOfBytes = fileContents.length
@@ -227,7 +227,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       Random.nextBytes(fileContents)
       putFileOnFtpWithContents(fileName, fileContents)
       val (result, probe) =
-        retrieveFromPathWithOffset(s"/$fileName", 
offset).toMat(TestSink.probe)(Keep.both).run()
+        retrieveFromPathWithOffset(s"/$fileName", 
offset).toMat(TestSink())(Keep.both).run()
       probe.request(1000).expectNextOrComplete()
 
       val expectedNumOfBytes = fileContents.length - offset
@@ -242,7 +242,7 @@ trait CommonFtpStageSpec extends BaseSpec with Eventually {
       generateFiles(numOfFiles, numOfFiles, basePath)
       val probe = listFiles(basePath)
         .mapAsyncUnordered(1)(file => retrieveFromPath(file.path, fromRoot = 
true).to(Sink.ignore).run())
-        .toMat(TestSink.probe)(Keep.right)
+        .toMat(TestSink())(Keep.right)
         .run()
       val result = probe.request(numOfFiles + 1).expectNextN(numOfFiles)
       probe.expectComplete()
diff --git 
a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala 
b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
index f7a1d2fab..b563f41c1 100644
--- a/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
+++ b/google-cloud-pub-sub/src/test/scala/docs/scaladsl/IntegrationSpec.scala
@@ -126,7 +126,7 @@ class IntegrationSpec
       // the acknowledged message should not arrive again
       val (stream, result2) = GooglePubSub
         .subscribe(topic2subscription, config)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       result2.ensureSubscription()
diff --git 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
index d6653269c..535c5148d 100644
--- 
a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
+++ 
b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/util/AnnotateLastSpec.scala
@@ -33,7 +33,7 @@ class AnnotateLastSpec
   "AnnotateLast" should {
 
     "indicate last element" in {
-      val probe = Source(1 to 3).via(AnnotateLast[Int]).runWith(TestSink.probe)
+      val probe = Source(1 to 3).via(AnnotateLast[Int]).runWith(TestSink())
       probe.requestNext(NotLast(1))
       probe.requestNext(NotLast(2))
       probe.requestNext(Last(3))
@@ -41,24 +41,24 @@ class AnnotateLastSpec
     }
 
     "indicate first element is last if only one element" in {
-      val probe = 
Source.single(1).via(AnnotateLast[Int]).runWith(TestSink.probe)
+      val probe = Source.single(1).via(AnnotateLast[Int]).runWith(TestSink())
       probe.requestNext(Last(1))
       probe.expectComplete()
     }
 
     "do nothing when stream is empty" in {
-      val probe = 
Source.empty[Nothing].via(AnnotateLast[Nothing]).runWith(TestSink.probe)
+      val probe = 
Source.empty[Nothing].via(AnnotateLast[Nothing]).runWith(TestSink())
       probe.expectSubscriptionAndComplete()
     }
 
     "return zero value when stream is empty using zero apply" in {
-      val probe = 
Source.empty[Null].via(AnnotateLast[Null](null)).runWith(TestSink.probe)
+      val probe = 
Source.empty[Null].via(AnnotateLast[Null](null)).runWith(TestSink())
       probe.requestNext(Last(null))
       probe.expectComplete()
     }
 
     "don't return zero value if stream is non empty using zero apply" in {
-      val probe = 
Source.single(1).via(AnnotateLast[Int](0)).runWith(TestSink.probe)
+      val probe = 
Source.single(1).via(AnnotateLast[Int](0)).runWith(TestSink())
       probe.requestNext(Last(1))
       probe.expectComplete()
     }
diff --git 
a/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala 
b/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
index 7738f3a89..cdbedf6dc 100644
--- a/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
+++ b/jakartams/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
@@ -386,7 +386,7 @@ class JmsBufferedAckConnectorsSpec extends 
JmsSharedServerSpec {
 
           }
         }
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe.requestNext(convertSpanToDuration(patienceConfig.timeout)) 
shouldBe Some(aMessage)
@@ -399,7 +399,7 @@ class JmsBufferedAckConnectorsSpec extends 
JmsSharedServerSpec {
       probe.expectComplete()
 
       // Consuming again should give us no elements, as msg was acked and 
therefore removed from the broker
-      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink.probe)(Keep.both).run()
+      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink())(Keep.both).run()
       emptySourceProbe.ensureSubscription().expectNoMessage()
       emptyConsumerControl.shutdown()
       emptySourceProbe.expectComplete()
diff --git 
a/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
 
b/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
index 3795c6d10..e9ef7c491 100644
--- 
a/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
+++ 
b/jakartams/src/test/scala/org/apache/pekko/stream/connectors/jakartams/scaladsl/JmsAckConnectorsSpec.scala
@@ -422,7 +422,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
             case _                    => None
           }
         }
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe.requestNext(convertSpanToDuration(patienceConfig.timeout)) 
shouldBe Some(aMessage)
@@ -435,7 +435,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
       probe.expectComplete()
 
       // Consuming again should give us no elements, as msg was acked and 
therefore removed from the broker
-      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink.probe)(Keep.both).run()
+      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink())(Keep.both).run()
       emptySourceProbe.ensureSubscription().expectNoMessage()
       emptyConsumerControl.shutdown()
       emptySourceProbe.expectComplete()
diff --git 
a/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala 
b/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
index 5c33cf710..bfc78430e 100644
--- a/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
+++ b/jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala
@@ -388,7 +388,7 @@ class JmsBufferedAckConnectorsSpec extends 
JmsSharedServerSpec {
 
           }
         }
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe.requestNext(convertSpanToDuration(patienceConfig.timeout)) 
shouldBe Some(aMessage)
@@ -401,7 +401,7 @@ class JmsBufferedAckConnectorsSpec extends 
JmsSharedServerSpec {
       probe.expectComplete()
 
       // Consuming again should give us no elements, as msg was acked and 
therefore removed from the broker
-      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink.probe)(Keep.both).run()
+      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink())(Keep.both).run()
       emptySourceProbe.ensureSubscription().expectNoMessage()
       emptyConsumerControl.shutdown()
       emptySourceProbe.expectComplete()
diff --git 
a/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
 
b/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
index 10339609b..0cbdde347 100644
--- 
a/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
+++ 
b/jms/src/test/scala/org/apache/pekko/stream/connectors/jms/scaladsl/JmsAckConnectorsSpec.scala
@@ -425,7 +425,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
             case _                    => None
           }
         }
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       probe.requestNext(convertSpanToDuration(patienceConfig.timeout)) 
shouldBe Some(aMessage)
@@ -438,7 +438,7 @@ class JmsAckConnectorsSpec extends JmsSpec {
       probe.expectComplete()
 
       // Consuming again should give us no elements, as msg was acked and 
therefore removed from the broker
-      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink.probe)(Keep.both).run()
+      val (emptyConsumerControl, emptySourceProbe) = 
source.toMat(TestSink())(Keep.both).run()
       emptySourceProbe.ensureSubscription().expectNoMessage()
       emptyConsumerControl.shutdown()
       emptySourceProbe.expectComplete()
diff --git 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
index 40ca5f206..4eb0f8722 100644
--- 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
+++ 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisFlowSpec.scala
@@ -105,10 +105,9 @@ class KinesisFlowSpec extends AnyWordSpec with Matchers 
with KinesisMock with Lo
         .build()
 
     val (sourceProbe, sinkProbe) =
-      TestSource
-        .probe[PutRecordsRequestEntry]
+      TestSource[PutRecordsRequestEntry]()
         .via(KinesisFlow(streamName, settings))
-        .toMat(TestSink.probe[PutRecordsResultEntry])(Keep.both)
+        .toMat(TestSink[PutRecordsResultEntry]())(Keep.both)
         .run()
   }
 
@@ -129,10 +128,9 @@ class KinesisFlowSpec extends AnyWordSpec with Matchers 
with KinesisMock with Lo
       .map(i => (PutRecordsResultEntry.builder().build(), i))
 
     val (sourceProbe, sinkProbe) =
-      TestSource
-        .probe[(PutRecordsRequestEntry, Int)]
+      TestSource[(PutRecordsRequestEntry, Int)]()
         .via(KinesisFlow.withContext(streamName, settings))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
   }
 
diff --git 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
index d87596998..78ada709e 100644
--- 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
+++ 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
@@ -282,7 +282,7 @@ class KinesisSchedulerSourceSpec
         .viaMat(Valve(switchMode))(Keep.right)
         .viaMat(KillSwitches.single)(Keep.both)
         .watchTermination()(Keep.both)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
     watch.onComplete(_ => lock.release())
@@ -449,13 +449,12 @@ class KinesisSchedulerSourceSpec
 
   private trait KinesisSchedulerCheckpointContext {
     val (sourceProbe, sinkProbe) =
-      TestSource
-        .probe[CommittableRecord]
+      TestSource[CommittableRecord]()
         .via(
           KinesisSchedulerSource
             .checkpointRecordsFlow(
               KinesisSchedulerCheckpointSettings(maxBatchSize = 100, 
maxBatchWait = 500.millis)))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
     val recordProcessor = new ShardProcessor(_ => ())
   }
diff --git 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
index db6b5f4f3..3e56c58b9 100644
--- 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
+++ 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSourceSpec.scala
@@ -54,7 +54,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers 
with KinesisMock with
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("1").toByteBuffer)).build(),
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("2").toByteBuffer)).build())
 
-        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink.probe)
+        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink())
 
         probe.requestNext().utf8String shouldEqual "1"
         probe.requestNext().utf8String shouldEqual "2"
@@ -72,7 +72,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers 
with KinesisMock with
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("1").toByteBuffer)).build(),
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("2").toByteBuffer)).build())
 
-        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink.probe)
+        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink())
 
         probe.request(2)
         probe.expectNext().utf8String shouldEqual "1"
@@ -94,7 +94,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers 
with KinesisMock with
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("5").toByteBuffer)).build(),
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("6").toByteBuffer)).build())
 
-        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink.probe)
+        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink())
 
         probe.request(1)
         probe.expectNext().utf8String shouldEqual "1"
@@ -124,7 +124,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers 
with KinesisMock with
           
Record.builder().data(SdkBytes.fromByteBuffer(ByteString("3").toByteBuffer)).build())
 
         val probe =
-          KinesisSource.basicMerge(mergeSettings, 
amazonKinesisAsync).map(_.utf8String).runWith(TestSink.probe)
+          KinesisSource.basicMerge(mergeSettings, 
amazonKinesisAsync).map(_.utf8String).runWith(TestSink())
 
         probe.request(6)
         probe.expectNextUnordered("1", "1", "2", "2", "3", "3")
@@ -137,7 +137,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers 
with KinesisMock with
         override def records =
           
util.Arrays.asList(Record.builder().data(SdkBytes.fromByteBuffer(ByteString("1").toByteBuffer)).build())
 
-        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink.probe)
+        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink())
 
         probe.requestNext().utf8String shouldEqual "1"
         nextShardIterator.set(null)
@@ -150,7 +150,7 @@ class KinesisSourceSpec extends AnyWordSpec with Matchers 
with KinesisMock with
 
     "fail with error when GetStreamRequest fails" in assertAllStagesStopped {
       new KinesisSpecContext with WithGetShardIteratorSuccess with 
WithGetRecordsFailure {
-        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink.probe)
+        val probe = KinesisSource.basic(shardSettings, 
amazonKinesisAsync).runWith(TestSink())
         probe.request(1)
         probe.expectError() shouldBe an[KinesisErrors.GetRecordsError]
         probe.cancel()
diff --git 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
index 4f9c8d8b9..81bac5187 100644
--- 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
+++ 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesisfirehose/KinesisFirehoseFlowSpec.scala
@@ -81,10 +81,9 @@ class KinesisFirehoseFlowSpec extends AnyWordSpec with 
Matchers with KinesisFire
     val requestError = new RuntimeException("kinesisfirehose-error")
 
     val (sourceProbe, sinkProbe) =
-      TestSource
-        .probe[Record]
+      TestSource[Record]()
         .via(KinesisFirehoseFlow(streamName, settings))
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
   }
 
diff --git 
a/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
 
b/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
index a9d47f05a..f2d4ced68 100644
--- 
a/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
+++ 
b/mqtt-streaming/src/test/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/MqttFrameStageSpec.scala
@@ -18,8 +18,7 @@ import org.apache.pekko
 import pekko.actor.ActorSystem
 import pekko.stream.connectors.testkit.scaladsl.LogCapturing
 import pekko.stream.scaladsl.{ Keep, Source }
-import pekko.stream.testkit.javadsl.TestSink
-import pekko.stream.testkit.scaladsl.TestSource
+import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
 import pekko.testkit.TestKit
 import pekko.util.ByteString
 import org.scalatest.BeforeAndAfterAll
@@ -41,7 +40,7 @@ class MqttFrameStageSpec
       Source
         .single(bytes)
         .via(new MqttFrameStage(MaxPacketSize))
-        .runWith(TestSink.probe(system))
+        .runWith(TestSink()(system))
         .request(1)
         .expectNext(bytes)
         .expectComplete()
@@ -52,7 +51,7 @@ class MqttFrameStageSpec
       Source
         .single(bytes)
         .via(new MqttFrameStage(MaxPacketSize))
-        .runWith(TestSink.probe(system))
+        .runWith(TestSink()(system))
         .request(1)
         .expectNext(bytes)
         .expectComplete()
@@ -63,7 +62,7 @@ class MqttFrameStageSpec
       Source
         .single(bytes ++ bytes)
         .via(new MqttFrameStage(MaxPacketSize))
-        .runWith(TestSink.probe(system))
+        .runWith(TestSink()(system))
         .request(2)
         .expectNext(bytes, bytes)
         .expectComplete()
@@ -74,10 +73,9 @@ class MqttFrameStageSpec
       val bytes1 = 
ByteString.newBuilder.putByte(1).putBytes(Array.ofDim(0x80)).result()
 
       val (pub, sub) =
-        TestSource
-          .probe(system)
+        TestSource()(system)
           .via(new MqttFrameStage(MaxPacketSize * 2))
-          .toMat(TestSink.probe(system))(Keep.both)
+          .toMat(TestSink()(system))(Keep.both)
           .run()
 
       pub.sendNext(bytes0)
@@ -97,7 +95,7 @@ class MqttFrameStageSpec
         Source
           .single(bytes)
           .via(new MqttFrameStage(MaxPacketSize))
-          .runWith(TestSink.probe(system))
+          .runWith(TestSink()(system))
           .request(1)
           .expectError()
       ex.getMessage shouldBe s"Max packet size of $MaxPacketSize exceeded with 
${MaxPacketSize + 2}"
diff --git a/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java 
b/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
index 9b606ff36..c4d00414f 100644
--- a/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqtt/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -314,7 +314,7 @@ public class MqttSourceTest {
         MqttSource.atMostOnce(settings1, subscriptions, bufferSize);
 
     Pair<CompletionStage<Done>, TestSubscriber.Probe<MqttMessage>> result2 =
-        source1.toMat(TestSink.probe(system), Keep.both()).run(system);
+        source1.toMat(TestSink.create(system), Keep.both()).run(system);
 
     // Ensure that the connection made it all the way to the server by waiting 
until it receives a
     // message
diff --git a/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala 
b/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index d92e59763..d1fd588b8 100644
--- a/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqtt/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -321,7 +321,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
             .withBroker(s"tcp://localhost:$proxyPort"),
           MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
           8)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Ensure that the connection made it all the way to the server by 
waiting until it receives a message
@@ -385,7 +385,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
             MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
             8))
 
-      val (subscribed, probe) = source1.toMat(TestSink.probe)(Keep.both).run()
+      val (subscribed, probe) = source1.toMat(TestSink())(Keep.both).run()
 
       // Ensure that the connection made it all the way to the server by 
waiting until it receives a message
       Await.ready(subscribed, timeout)
@@ -435,7 +435,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
           MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
           8)
         .via(sharedKillSwitch.flow)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
       Await.ready(killSwitch, timeout)
 
@@ -471,7 +471,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
             .withOfflinePersistenceSettings(bufferSize = 1234),
           MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
           8)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Ensure that the connection made it all the way to the server by 
waiting until it receives a message
diff --git a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java 
b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
index 3d769ba31..395a88aed 100644
--- a/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
+++ b/mqttv5/src/test/java/docs/javadsl/MqttSourceTest.java
@@ -317,7 +317,7 @@ public class MqttSourceTest {
                 MqttSource.atMostOnce(settings1, subscriptions, bufferSize);
 
         Pair<CompletionStage<Done>, TestSubscriber.Probe<MqttMessage>> result2 
=
-                source1.toMat(TestSink.probe(system), Keep.both()).run(system);
+                source1.toMat(TestSink.create(system), 
Keep.both()).run(system);
 
         // Ensure that the connection made it all the way to the server by 
waiting until it receives a
         // message
diff --git a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala 
b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
index 5ae7b1b3a..bedd05ab3 100644
--- a/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
+++ b/mqttv5/src/test/scala/docs/scaladsl/MqttSourceSpec.scala
@@ -331,7 +331,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
             .withBroker(s"tcp://localhost:$proxyPort"),
           MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
           8)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Ensure that the connection made it all the way to the server by 
waiting until it receives a message
@@ -396,7 +396,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
             MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
             8))
 
-      val (subscribed, probe) = source1.toMat(TestSink.probe)(Keep.both).run()
+      val (subscribed, probe) = source1.toMat(TestSink())(Keep.both).run()
 
       // Ensure that the connection made it all the way to the server by 
waiting until it receives a message
       Await.ready(subscribed, timeout)
@@ -446,7 +446,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
           MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
           8)
         .via(sharedKillSwitch.flow)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
       Await.ready(killSwitch, timeout)
 
@@ -482,7 +482,7 @@ class MqttSourceSpec extends MqttSpecBase("MqttSourceSpec") 
{
             .withOfflinePersistenceSettings(bufferSize = 1234),
           MqttSubscriptions(topic1, MqttQoS.AtLeastOnce),
           8)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       // Ensure that the connection made it all the way to the server by 
waiting until it receives a message
diff --git a/project/PekkoCoreDependency.scala 
b/project/PekkoCoreDependency.scala
index 1ad1bac11..61d5bd7d0 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoCoreDependency extends PekkoDependency {
   override val checkProject: String = "pekko-cluster-sharding-typed"
   override val module: Option[String] = None
-  override val currentVersion: String = "1.2.1"
+  override val currentVersion: String = "1.3.0"
 }
diff --git 
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
 
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
index b4ffcb319..fef0d2b45 100644
--- 
a/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
+++ 
b/sns/src/test/scala/org/apache/pekko/stream/connectors/sns/SnsPublishMockingSpec.scala
@@ -38,7 +38,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
     
when(snsClient.publish(meq(publishRequest))).thenReturn(CompletableFuture.completedFuture(publishResult))
 
     val (probe, future) =
-      
TestSource.probe[PublishRequest].via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[PublishRequest]().via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
 
     
probe.sendNext(PublishRequest.builder().message("sns-message").build()).sendComplete()
 
@@ -53,7 +53,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
     
when(snsClient.publish(any[PublishRequest]())).thenReturn(CompletableFuture.completedFuture(publishResult))
 
     val (probe, future) =
-      
TestSource.probe[PublishRequest].via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[PublishRequest]().via(SnsPublisher.publishFlow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
 
     probe
       .sendNext(PublishRequest.builder().message("sns-message-1").build())
@@ -79,7 +79,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
     
when(snsClient.publish(any[PublishRequest]())).thenReturn(CompletableFuture.completedFuture(publishResult))
 
     val (probe, future) =
-      
TestSource.probe[PublishRequest].via(SnsPublisher.publishFlow()).toMat(Sink.seq)(Keep.both).run()
+      
TestSource[PublishRequest]().via(SnsPublisher.publishFlow()).toMat(Sink.seq)(Keep.both).run()
 
     probe
       
.sendNext(PublishRequest.builder().message("sns-message-1").topicArn("topic-arn-1").build())
@@ -105,7 +105,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
 
     
when(snsClient.publish(meq(publishRequest))).thenReturn(CompletableFuture.completedFuture(publishResult))
 
-    val (probe, future) = 
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
     probe.sendNext("sns-message").sendComplete()
 
     Await.result(future, 1.second) mustBe publishResult :: Nil
@@ -117,7 +117,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
 
     
when(snsClient.publish(any[PublishRequest]())).thenReturn(CompletableFuture.completedFuture(publishResult))
 
-    val (probe, future) = 
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
     
probe.sendNext("sns-message-1").sendNext("sns-message-2").sendNext("sns-message-3").sendComplete()
 
     Await.result(future, 1.second) mustBe publishResult :: publishResult :: 
publishResult :: Nil
@@ -140,7 +140,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
 
     when(snsClient.publish(meq(publishRequest))).thenReturn(promise)
 
-    val (probe, future) = 
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
     probe.sendNext("sns-message").sendComplete()
 
     a[RuntimeException] should be thrownBy {
@@ -153,7 +153,7 @@ class SnsPublishMockingSpec extends AnyFlatSpec with 
DefaultTestContext with Mat
   it should "fail stage if upstream failure occurs" in {
     case class MyCustomException(message: String) extends Exception(message)
 
-    val (probe, future) = 
TestSource.probe[String].via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().via(SnsPublisher.flow("topic-arn")).toMat(Sink.seq)(Keep.both).run()
     probe.sendError(MyCustomException("upstream failure"))
 
     a[MyCustomException] should be thrownBy {
diff --git 
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
 
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
index 8bb3f8277..854bcd3bb 100644
--- 
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
+++ 
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsPublishSinkSpec.scala
@@ -41,7 +41,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
     when(sqsClient.sendMessage(any[SendMessageRequest]))
       
.thenReturn(CompletableFuture.completedFuture(SendMessageResponse.builder().build()))
 
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
     probe.sendNext("notused").sendComplete()
     Await.result(future, 1.second) shouldBe Done
 
@@ -58,7 +58,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
           override def get(): SendMessageResponse = throw new 
RuntimeException("Fake client error")
         }))
 
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
     probe.sendNext("notused").sendComplete()
 
     a[RuntimeException] should be thrownBy {
@@ -97,7 +97,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
 
   it should "failure the promise on upstream failure" in {
     implicit val sqsClient: SqsAsyncClient = mock[SqsAsyncClient]
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
 
     probe.sendError(new RuntimeException("Fake upstream failure"))
 
@@ -112,7 +112,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
     when(sqsClient.sendMessage(any[SendMessageRequest]))
       
.thenReturn(CompletableFuture.completedFuture(SendMessageResponse.builder().build()))
 
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink("notused"))(Keep.both).run()
     probe
       .sendNext("test-101")
       .sendNext("test-102")
@@ -138,7 +138,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
               
SendMessageBatchResultEntry.builder().id("0").messageId(UUID.randomUUID().toString).build())
             .build()))
 
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
     probe.sendNext("notused").sendComplete()
     Await.result(future, 1.second) shouldBe Done
 
@@ -164,7 +164,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
 
     val settings = SqsPublishGroupedSettings.create().withMaxBatchSize(5)
 
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused", 
settings))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink.grouped("notused", 
settings))(Keep.both).run()
     probe
       .sendNext("notused - 1")
       .sendNext("notused - 2")
@@ -200,7 +200,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
             .failed(BatchResultErrorEntry.builder().id("4").message("a very 
weird error just happened").build())
             .build()))
 
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink.grouped("notused"))(Keep.both).run()
     probe
       .sendNext("notused - 1")
       .sendNext("notused - 2")
@@ -226,7 +226,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
       }))
 
     val settings = SqsPublishGroupedSettings().withMaxBatchSize(5)
-    val (probe, future) = 
TestSource.probe[String].toMat(SqsPublishSink.grouped("notused", 
settings))(Keep.both).run()
+    val (probe, future) = 
TestSource[String]().toMat(SqsPublishSink.grouped("notused", 
settings))(Keep.both).run()
     probe
       .sendNext("notused - 1")
       .sendNext("notused - 2")
@@ -258,7 +258,7 @@ class SqsPublishSinkSpec extends AnyFlatSpec with Matchers 
with DefaultTestConte
               
SendMessageBatchResultEntry.builder().id("3").messageId(UUID.randomUUID().toString).build())
             .build()))
 
-    val (probe, future) = 
TestSource.probe[Seq[String]].toMat(SqsPublishSink.batch("notused"))(Keep.both).run()
+    val (probe, future) = 
TestSource[Seq[String]]().toMat(SqsPublishSink.batch("notused"))(Keep.both).run()
     probe
       .sendNext(
         Seq(
diff --git 
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
 
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
index 0ebfd6f74..6a6d3fdb3 100644
--- 
a/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
+++ 
b/sqs/src/test/scala/org/apache/pekko/stream/connectors/sqs/scaladsl/SqsSourceMockSpec.scala
@@ -53,7 +53,7 @@ class SqsSourceMockSpec extends AnyFlatSpec with Matchers 
with DefaultTestContex
 
     val probe = SqsSource(
       "url",
-      
SqsSourceSettings.Defaults.withMaxBufferSize(10)).runWith(TestSink.probe[Message])
+      
SqsSourceSettings.Defaults.withMaxBufferSize(10)).runWith(TestSink[Message]())
 
     defaultMessages.foreach(probe.requestNext)
 
@@ -90,7 +90,7 @@ class SqsSourceMockSpec extends AnyFlatSpec with Matchers 
with DefaultTestContex
     val probe = SqsSource(
       "url",
       SqsSourceSettings.Defaults.withMaxBufferSize(
-        SqsSourceSettings.Defaults.maxBatchSize * 
bufferToBatchRatio)).runWith(TestSink.probe[Message])
+        SqsSourceSettings.Defaults.maxBatchSize * 
bufferToBatchRatio)).runWith(TestSink[Message]())
 
     Thread.sleep(timeout.toMillis * (bufferToBatchRatio + 1))
 
@@ -141,7 +141,7 @@ class SqsSourceMockSpec extends AnyFlatSpec with Matchers 
with DefaultTestContex
       SqsSourceSettings.Defaults
         .withMaxBufferSize(10)
         .withParallelRequests(10)
-        .withWaitTime(timeout)).runWith(TestSink.probe[Message])
+        .withWaitTime(timeout)).runWith(TestSink[Message]())
 
     (1 to firstWithDataCount * 10).foreach(_ => probe.requestNext())
 
diff --git 
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
 
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
index 336189201..53ac2f215 100644
--- 
a/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
+++ 
b/text/src/test/scala/org/apache/pekko/stream/connectors/text/scaladsl/CharsetCodingFlowsSpec.scala
@@ -135,8 +135,7 @@ class CharsetCodingFlowsSpec
     }
 
     def verifyByteSends(charsetIn: Charset, charsetOut: Charset, in: String) = 
{
-      val (source, sink) = TestSource
-        .probe[ByteString]
+      val (source, sink) = TestSource[ByteString]()
         .via(TextFlow.transcoding(charsetIn, charsetOut))
         .map(_.decodeString(charsetOut))
         .toMat(Sink.seq)(Keep.both)
@@ -163,10 +162,9 @@ class CharsetCodingFlowsSpec
     }
 
     "complete" in {
-      val (source, sink) = TestSource
-        .probe[ByteString]
+      val (source, sink) = TestSource[ByteString]()
         .via(TextFlow.transcoding(StandardCharsets.UTF_8, 
StandardCharsets.UTF_8))
-        .toMat(TestSink.probe[ByteString])(Keep.both)
+        .toMat(TestSink[ByteString]())(Keep.both)
         .run()
       source.sendNext(ByteString("eins,zwei,drei"))
       sink.request(3)
diff --git a/udp/src/test/java/docs/javadsl/UdpTest.java 
b/udp/src/test/java/docs/javadsl/UdpTest.java
index 346b9d175..90dd503cf 100644
--- a/udp/src/test/java/docs/javadsl/UdpTest.java
+++ b/udp/src/test/java/docs/javadsl/UdpTest.java
@@ -72,9 +72,9 @@ public class UdpTest {
             Pair<TestPublisher.Probe<Datagram>, 
CompletionStage<InetSocketAddress>>,
             TestSubscriber.Probe<Datagram>>
         materialized =
-            TestSource.<Datagram>probe(system)
+            TestSource.<Datagram>create(system)
                 .viaMat(bindFlow, Keep.both())
-                .toMat(TestSink.probe(system), Keep.both())
+                .toMat(TestSink.create(system), Keep.both())
                 .run(system);
 
     {
@@ -140,9 +140,9 @@ public class UdpTest {
             Pair<TestPublisher.Probe<Datagram>, 
CompletionStage<InetSocketAddress>>,
             TestSubscriber.Probe<Datagram>>
         materialized =
-            TestSource.<Datagram>probe(system)
+            TestSource.<Datagram>create(system)
                 .viaMat(bindFlow, Keep.both())
-                .toMat(TestSink.probe(system), Keep.both())
+                .toMat(TestSink.create(system), Keep.both())
                 .run(system);
 
     {
diff --git a/udp/src/test/scala/docs/scaladsl/UdpSpec.scala 
b/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
index 8f58affa6..2f87b71bc 100644
--- a/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
+++ b/udp/src/test/scala/docs/scaladsl/UdpSpec.scala
@@ -62,10 +62,9 @@ class UdpSpec
         Udp.bindFlow(bindToLocal)
       // #bind-flow
 
-      val ((pub, bound), sub) = TestSource
-        .probe[Datagram](system)
+      val ((pub, bound), sub) = TestSource[Datagram]()(system)
         .viaMat(bindFlow)(Keep.both)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val destination = bound.futureValue
@@ -103,10 +102,9 @@ class UdpSpec
       val bindFlow: Flow[Datagram, Datagram, Future[InetSocketAddress]] =
         Udp.bindFlow(bindToLocal, List(UdpSO.broadcast(true)))
 
-      val ((pub, bound), sub) = TestSource
-        .probe[Datagram](system)
+      val ((pub, bound), sub) = TestSource[Datagram]()(system)
         .viaMat(bindFlow)(Keep.both)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val destination = bound.futureValue
@@ -133,16 +131,14 @@ class UdpSpec
     }
 
     "ping-pong messages" in {
-      val ((pub1, bound1), sub1) = TestSource
-        .probe[Datagram](system)
+      val ((pub1, bound1), sub1) = TestSource[Datagram]()(system)
         .viaMat(Udp.bindFlow(bindToLocal))(Keep.both)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
-      val ((pub2, bound2), sub2) = TestSource
-        .probe[Datagram](system)
+      val ((pub2, bound2), sub2) = TestSource[Datagram]()(system)
         .viaMat(Udp.bindFlow(bindToLocal))(Keep.both)
-        .toMat(TestSink.probe)(Keep.both)
+        .toMat(TestSink())(Keep.both)
         .run()
 
       val boundAddress1 = bound1.futureValue


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to