This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new 7419fe54f test with pekko 2.0.0-M1 (#1379)
7419fe54f is described below
commit 7419fe54fd85d4a205b1fe85e885616039d50e05
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Feb 4 12:44:24 2026 +0100
test with pekko 2.0.0-M1 (#1379)
* test with pekko 2.0.0-M1
* gzipDecompress
* Update build.sbt
* Remove Apache Maven Staging Repo from resolvers
* watchTermination
---
.../stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala | 2 +-
build.sbt | 5 ++++-
file/src/test/java/docs/javadsl/NestedTarReaderTest.java | 2 +-
file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala | 2 +-
.../connectors/kinesis/KinesisSchedulerSourceSpec.scala | 2 +-
.../connectors/mqtt/streaming/scaladsl/MqttSession.scala | 12 ++++++------
project/PekkoCoreDependency.scala | 2 +-
project/PekkoHttpDependency.scala | 2 +-
project/plugins.sbt | 3 ++-
.../unixdomainsocket/impl/UnixDomainSocketImpl.scala | 4 ++--
10 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
index 6505b24f7..c58542cdb 100644
---
a/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
+++
b/amqp/src/test/scala/org/apache/pekko/stream/connectors/amqp/scaladsl/AmqpConnectorsSpec.scala
@@ -147,7 +147,7 @@ class AmqpConnectorsSpec extends AmqpSpec with
ScalaCheckDrivenPropertyChecks {
Source
.single(outgoingMessage)
- .watchTermination()(Keep.right)
+ .watchTermination(Keep.right)
.to(AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider)))
.run()
.futureValue shouldBe Done
diff --git a/build.sbt b/build.sbt
index ce87a082c..f6ae671c5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -294,7 +294,10 @@ lazy val ironmq = pekkoConnectorProject(
"ironmq",
"ironmq",
Dependencies.IronMq,
- Test / fork := true)
+ Test / fork := true,
+ // org.mdedetrich libs don't have a release that supports Pekko 2 yet
+ // so we need to disable eviction warnings for now
+ evictionErrorLevel := Level.Info)
lazy val jms = pekkoConnectorProject("jms", "jms", Dependencies.Jms)
diff --git a/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
b/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
index 50a41c463..0e2e61f9f 100644
--- a/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
+++ b/file/src/test/java/docs/javadsl/NestedTarReaderTest.java
@@ -111,7 +111,7 @@ public class NestedTarReaderTest {
.substring(0, TARGZ_EXT.length() - 2)));
readMetadata =
source
- .via(Compression.gunzip(MAX_GUNZIP_CHUNK_SIZE))
+ .via(Compression.gzipDecompress(MAX_GUNZIP_CHUNK_SIZE))
.via(unTarFlow(targetSubDir, system))
.runWith(Sink.seq(), system);
} else {
diff --git a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
index 22fbf6a52..fb1676684 100644
--- a/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
+++ b/file/src/test/scala/docs/scaladsl/LogRotatorSinkSpec.scala
@@ -288,7 +288,7 @@ class LogRotatorSinkSpec
val (contents, sizes) = readUpFileBytesAndSizesThenClean(files())
sizes should contain theSameElementsAs Seq(51L, 51L, 51L)
val uncompressed = Future.sequence(contents.map { c =>
-
Source.single(c).via(Compression.gunzip()).map(_.utf8String).runWith(Sink.head)
+
Source.single(c).via(Compression.gzipDecompress()).map(_.utf8String).runWith(Sink.head)
})
uncompressed.futureValue should contain theSameElementsAs
TestLines.sliding(2, 2).map(_.mkString("")).toList
}
diff --git
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
index 7dab718c0..70b72b284 100644
---
a/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
+++
b/kinesis/src/test/scala/org/apache/pekko/stream/connectors/kinesis/KinesisSchedulerSourceSpec.scala
@@ -284,7 +284,7 @@ class KinesisSchedulerSourceSpec
backpressureTimeout = backpressureTimeout))
.viaMat(Valve(switchMode))(Keep.right)
.viaMat(KillSwitches.single)(Keep.both)
- .watchTermination()(Keep.both)
+ .watchTermination(Keep.both)
.toMat(TestSink())(Keep.both)
.run()
diff --git
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
index 7af83df97..f5da6ad53 100644
---
a/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
+++
b/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/MqttSession.scala
@@ -195,7 +195,7 @@ final class ActorMqttClientSession(settings:
MqttSessionSettings)(implicit syste
Future.successful(
Flow[Command[A]]
.watch(clientConnector.toClassic)
- .watchTermination() {
+ .watchTermination {
case (_, terminated) =>
terminated.onComplete {
case Failure(_: WatchedActorTerminatedException) =>
@@ -224,7 +224,7 @@ final class ActorMqttClientSession(settings:
MqttSessionSettings)(implicit syste
case Subscriber.SubscribeFailed =>
ActorMqttClientSession.SubscribeFailed
case ClientConnector.PingFailed =>
ActorMqttClientSession.PingFailed
}
- .watchTermination() { (_, done) =>
+ .watchTermination { (_, done) =>
done.onComplete {
case Success(_) => killSwitch.shutdown()
case Failure(t) => killSwitch.abort(t)
@@ -303,7 +303,7 @@ final class ActorMqttClientSession(settings:
MqttSessionSettings)(implicit syste
private[streaming] override def eventFlow[A](connectionId: ByteString):
EventFlow[A] =
Flow[ByteString]
.watch(clientConnector.toClassic)
- .watchTermination() {
+ .watchTermination {
case (_, terminated) =>
terminated.onComplete {
case Failure(_: WatchedActorTerminatedException) =>
@@ -531,7 +531,7 @@ final class ActorMqttServerSession(settings:
MqttSessionSettings)(implicit syste
Future.successful(
Flow[Command[A]]
.watch(serverConnector.toClassic)
- .watchTermination() {
+ .watchTermination {
case (_, terminated) =>
terminated.onComplete {
case Failure(_: WatchedActorTerminatedException) =>
@@ -560,7 +560,7 @@ final class ActorMqttServerSession(settings:
MqttSessionSettings)(implicit syste
}.mapError {
case ServerConnector.PingFailed =>
ActorMqttServerSession.PingFailed
}
- .watchTermination() { (_, done) =>
+ .watchTermination { (_, done) =>
done.onComplete {
case Success(_) => killSwitch.shutdown()
case Failure(t) => killSwitch.abort(t)
@@ -657,7 +657,7 @@ final class ActorMqttServerSession(settings:
MqttSessionSettings)(implicit syste
override def eventFlow[A](connectionId: ByteString): EventFlow[A] =
Flow[ByteString]
.watch(serverConnector.toClassic)
- .watchTermination() {
+ .watchTermination {
case (_, terminated) =>
terminated.onComplete {
case Failure(_: WatchedActorTerminatedException) =>
diff --git a/project/PekkoCoreDependency.scala
b/project/PekkoCoreDependency.scala
index d3672d829..c127cb61c 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
- override val currentVersion: String = "1.4.0"
+ override val currentVersion: String = "2.0.0-M1"
}
diff --git a/project/PekkoHttpDependency.scala
b/project/PekkoHttpDependency.scala
index b516636a8..d380824f3 100644
--- a/project/PekkoHttpDependency.scala
+++ b/project/PekkoHttpDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoHttpDependency extends PekkoDependency {
override val checkProject: String = "pekko-http-testkit"
override val module: Option[String] = Some("http")
- override val currentVersion: String = "1.3.0"
+ override val currentVersion: String = "2.0.0-M1"
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 01bf5d9fe..54ab55134 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -29,7 +29,8 @@ addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" %
"0.10.7")
addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.6.1")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" %
"3.0.2")
// Pekko gRPC -- sync with PekkoGrpcBinaryVersion in Dependencies.scala
-addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.2.0")
+resolvers += Resolver.ApacheMavenStagingRepo
+addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "2.0.0-M1")
// templating
addSbtPlugin("com.github.sbt" % "sbt-boilerplate" % "0.8.0")
// Run JUnit 5 tests with sbt
diff --git
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
index b5cbf5bfe..569d211e6 100644
---
a/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
+++
b/unix-domain-socket/src/main/scala/org/apache/pekko/stream/connectors/unixdomainsocket/impl/UnixDomainSocketImpl.scala
@@ -340,7 +340,7 @@ private[unixdomainsocket] object UnixDomainSocketImpl {
sel.wakeup()
sent.future.map(_ => bytes)
}
- .watchTermination() {
+ .watchTermination {
case (_, done) =>
done.onComplete { _ =>
sendReceiveContext.send = if (halfClose) {
@@ -401,7 +401,7 @@ private[unixdomainsocket] abstract class
UnixDomainSocketImpl(system: ExtendedAc
.map {
case (_, source) =>
source
- .watchTermination() { (mat, done) =>
+ .watchTermination { (mat, done) =>
done
.andThen {
case _ =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]