This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new 7419fe54f test with pekko 2.0.0-M1 (#1379)
7419fe54f is described below

commit 7419fe54fd85d4a205b1fe85e885616039d50e05
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Feb 4 12:44:24 2026 +0100

    test with pekko 2.0.0-M1 (#1379)
    
    * test with pekko 2.0.0-M1
    
    * gzipDecompress
    
    * Update build.sbt
    
    * Remove Apache Maven Staging Repo from resolvers
    
    * watchTermination
---
 .../stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala |  2 +-
 build.sbt                                                    |  5 ++++-
 file/src/test/java/docs/javadsl/NestedTarReaderTest.java     |  2 +-
 file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala   |  2 +-
 .../connectors/kinesis/KinesisSchedulerSourceSpec.scala      |  2 +-
 .../connectors/mqtt/streaming/scaladsl/MqttSession.scala     | 12 ++++++------
 project/PekkoCoreDependency.scala                            |  2 +-
 project/PekkoHttpDependency.scala                            |  2 +-
 project/plugins.sbt                                          |  3 ++-
 .../unixdomainsocket/impl/UnixDomainSocketImpl.scala         |  4 ++--
 10 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
index 6505b24f7..c58542cdb 100644
--- 
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
+++ 
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
@@ -147,7 +147,7 @@ class AmqpConnectorsSpec extends AmqpSpec with 
ScalaCheckDrivenPropertyChecks {
 
       Source
         .single(outgoingMessage)
-        .watchTermination()(Keep.right)
+        .watchTermination(Keep.right)
         .to(AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider)))
         .run()
         .futureValue shouldBe Done
diff --git a/build.sbt b/build.sbt
index ce87a082c..f6ae671c5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -294,7 +294,10 @@ lazy val ironmq = pekkoConnectorProject(
   "ironmq",
   "ironmq",
   Dependencies.IronMq,
-  Test / fork := true)
+  Test / fork := true,
+  // org.mdedetrich libs don't have a release that supports Pekko 2 yet
+  // so we need to disable eviction warnings for now
+  evictionErrorLevel := Level.Info)
 
 lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms)
 
diff --git a/file/src/test/java/docs/javadsl/NestedTarReaderTest.java 
b/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
index 50a41c463..0e2e61f9f 100644
--- a/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
+++ b/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
@@ -111,7 +111,7 @@ public class NestedTarReaderTest {
                                     .substring(0, TARGZ_EXT.length() - 2)));
                 readMetadata =
                     source
-                        .via(Compression.gunzip(MAX_GUNZIP_CHUNK_SIZE))
+                        .via(Compression.gzipDecompress(MAX_GUNZIP_CHUNK_SIZE))
                         .via(unTarFlow(targetSubDir, system))
                         .runWith(Sink.seq(), system);
               } else {
diff --git a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala 
b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
index 22fbf6a52..fb1676684 100644
--- a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
@@ -288,7 +288,7 @@ class LogRotatorSinkSpec
       val (contents, sizes) = readUpFileBytesAndSizesThenClean(files())
       sizes should contain theSameElementsAs Seq(51L, 51L, 51L)
       val uncompressed = Future.sequence(contents.map { c =>
-        
Source.single(c).via(Compression.gunzip()).map(_.utf8String).runWith(Sink.head)
+        
Source.single(c).via(Compression.gzipDecompress()).map(_.utf8String).runWith(Sink.head)
       })
       uncompressed.futureValue should contain theSameElementsAs 
TestLines.sliding(2, 2).map(_.mkString("")).toList
     }
diff --git 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
index 7dab718c0..70b72b284 100644
--- 
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
+++ 
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
@@ -284,7 +284,7 @@ class KinesisSchedulerSourceSpec
           backpressureTimeout = backpressureTimeout))
         .viaMat(Valve(switchMode))(Keep.right)
         .viaMat(KillSwitches.single)(Keep.both)
-        .watchTermination()(Keep.both)
+        .watchTermination(Keep.both)
         .toMat(TestSink())(Keep.both)
         .run()
 
diff --git 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
index 7af83df97..f5da6ad53 100644
--- 
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
+++ 
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
@@ -195,7 +195,7 @@ final class ActorMqttClientSession(settings: 
MqttSessionSettings)(implicit syste
         Future.successful(
           Flow[Command[A]]
             .watch(clientConnector.toClassic)
-            .watchTermination() {
+            .watchTermination {
               case (_, terminated) =>
                 terminated.onComplete {
                   case Failure(_: WatchedActorTerminatedException) =>
@@ -224,7 +224,7 @@ final class ActorMqttClientSession(settings: 
MqttSessionSettings)(implicit syste
                       case Subscriber.SubscribeFailed    => 
ActorMqttClientSession.SubscribeFailed
                       case ClientConnector.PingFailed    => 
ActorMqttClientSession.PingFailed
                     }
-                      .watchTermination() { (_, done) =>
+                      .watchTermination { (_, done) =>
                         done.onComplete {
                           case Success(_) => killSwitch.shutdown()
                           case Failure(t) => killSwitch.abort(t)
@@ -303,7 +303,7 @@ final class ActorMqttClientSession(settings: 
MqttSessionSettings)(implicit syste
   private[streaming] override def eventFlow[A](connectionId: ByteString): 
EventFlow[A] =
     Flow[ByteString]
       .watch(clientConnector.toClassic)
-      .watchTermination() {
+      .watchTermination {
         case (_, terminated) =>
           terminated.onComplete {
             case Failure(_: WatchedActorTerminatedException) =>
@@ -531,7 +531,7 @@ final class ActorMqttServerSession(settings: 
MqttSessionSettings)(implicit syste
         Future.successful(
           Flow[Command[A]]
             .watch(serverConnector.toClassic)
-            .watchTermination() {
+            .watchTermination {
               case (_, terminated) =>
                 terminated.onComplete {
                   case Failure(_: WatchedActorTerminatedException) =>
@@ -560,7 +560,7 @@ final class ActorMqttServerSession(settings: 
MqttSessionSettings)(implicit syste
                     }.mapError {
                       case ServerConnector.PingFailed => 
ActorMqttServerSession.PingFailed
                     }
-                      .watchTermination() { (_, done) =>
+                      .watchTermination { (_, done) =>
                         done.onComplete {
                           case Success(_) => killSwitch.shutdown()
                           case Failure(t) => killSwitch.abort(t)
@@ -657,7 +657,7 @@ final class ActorMqttServerSession(settings: 
MqttSessionSettings)(implicit syste
   override def eventFlow[A](connectionId: ByteString): EventFlow[A] =
     Flow[ByteString]
       .watch(serverConnector.toClassic)
-      .watchTermination() {
+      .watchTermination {
         case (_, terminated) =>
           terminated.onComplete {
             case Failure(_: WatchedActorTerminatedException) =>
diff --git a/project/PekkoCoreDependency.scala 
b/project/PekkoCoreDependency.scala
index d3672d829..c127cb61c 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoCoreDependency extends PekkoDependency {
   override val checkProject: String = "pekko-cluster-sharding-typed"
   override val module: Option[String] = None
-  override val currentVersion: String = "1.4.0"
+  override val currentVersion: String = "2.0.0-M1"
 }
diff --git a/project/PekkoHttpDependency.scala 
b/project/PekkoHttpDependency.scala
index b516636a8..d380824f3 100644
--- a/project/PekkoHttpDependency.scala
+++ b/project/PekkoHttpDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoHttpDependency extends PekkoDependency {
   override val checkProject: String = "pekko-http-testkit"
   override val module: Option[String] = Some("http")
-  override val currentVersion: String = "1.3.0"
+  override val currentVersion: String = "2.0.0-M1"
 }
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 01bf5d9fe..54ab55134 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -29,7 +29,8 @@ addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % 
"0.10.7")
 addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.6.1")
 addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % 
"3.0.2")
 // Pekko gRPC -- sync with PekkoGrpcBinaryVersion in Dependencies.scala
-addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.2.0")
+resolvers += Resolver.ApacheMavenStagingRepo
+addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "2.0.0-M1")
 // templating
 addSbtPlugin("com.github.sbt" % "sbt-boilerplate" % "0.8.0")
 // Run JUnit 5 tests with sbt
diff --git 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
index b5cbf5bfe..569d211e6 100644
--- 
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
+++ 
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
@@ -340,7 +340,7 @@ private[unixdomainsocket] object UnixDomainSocketImpl {
           sel.wakeup()
           sent.future.map(_ => bytes)
         }
-        .watchTermination() {
+        .watchTermination {
           case (_, done) =>
             done.onComplete { _ =>
               sendReceiveContext.send = if (halfClose) {
@@ -401,7 +401,7 @@ private[unixdomainsocket] abstract class 
UnixDomainSocketImpl(system: ExtendedAc
           .map {
             case (_, source) =>
               source
-                .watchTermination() { (mat, done) =>
+                .watchTermination { (mat, done) =>
                   done
                     .andThen {
                       case _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to