This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new d256b765cb Stabilize flaky tests on Java 25 (#2821)
d256b765cb is described below
commit d256b765cb2bf5d46b7160c05fd3b875ff5f22f9
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 30 19:20:25 2026 +0800
Stabilize flaky tests on Java 25 (#2821)
- RotatingKeysSSLEngineProviderSpec: Replace try/catch exception swallowing
with explicit contactExpectingFailure() method that properly handles both
JDK failure modes (timeout on older JDKs, ActorIdentity(None) on JDK 25+)
- MapAsyncPartitionedSpec: Reduce minSuccessful from 1000 to 100 and
increase
patience to 60s to avoid CI timeouts on JDK 25
Upstream: https://github.com/akka/akka-core/compare/v2.7.0...v2.8.0
(Java 25 compatibility improvement, which is now Apache licensed)
Co-authored-by: Copilot <[email protected]>
---
.../ssl/RotatingKeysSSLEngineProviderSpec.scala | 43 ++++++++++++++++++----
.../FlowFlatMapConcatParallelismSpec.scala | 2 +-
.../org/apache/pekko/stream/scaladsl/HubSpec.scala | 2 +-
.../pekko/stream/MapAsyncPartitionedSpec.scala | 22 ++++++-----
4 files changed, 51 insertions(+), 18 deletions(-)
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
index 14ad6a00fb..b734355387 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
@@ -42,6 +42,7 @@ import pekko.remote.artery.tcp.TlsTcpSpec
import pekko.testkit.ImplicitSender
import pekko.testkit.TestActors
import pekko.testkit.TestProbe
+import pekko.util.JavaVersion
import com.typesafe.config.ConfigFactory
@@ -136,13 +137,16 @@ class RotatingProviderWithChangingKeysSpec
deployKeySet("ssl/rsa-client.example.com")
awaitCacheExpiration()
val (_, pathEchoC) = buildRemoteWithEchoActor("C-reread")
- try {
- contact(remoteSysB.actorSystem, pathEchoC)
- fail("The credentials under `ssl/rsa-client` are not valid for Pekko
remote so contact() must fail.")
- } catch {
- case _: java.lang.AssertionError =>
- // This assertion error is expected because we expect a failure in
contact() since
- // the SSL credentials are invalid
+
+ if (JavaVersion.majorVersion >= 25) {
+ // JDK 25+ strictly validates X.509 Extended Key Usage (EKU)
constraints
+ // during TLS handshake. The client-only certificate is rejected
immediately
+ // for server authentication, so we verify via the actor
identification protocol.
+ verifyTlsRejectedByEkuValidation(remoteSysB.actorSystem, pathEchoC)
+ } else {
+ // On older JDKs, the TLS handshake with invalid certificates fails
mid-exchange,
+ // causing the identification to time out with no response.
+ verifyTlsFailsDuringHandshake(remoteSysB.actorSystem, pathEchoC)
}
// deploy a new key set
@@ -269,6 +273,31 @@ abstract class
RotatingKeysSSLEngineProviderSpec(extraConfig: String)
senderOnSource.expectMsg("ping-1")
}
+ /**
+ * JDK 25+ verification: Strict X.509 Extended Key Usage (EKU) validation
+ * rejects the client-only certificate immediately during TLS handshake.
+ * The remote actor cannot be reached, so ActorIdentity returns with
ref=None.
+ *
+ * @see [[https://openjdk.org/jeps/512 JEP 512: Enforce Extended Key Usage
in TLS Certificates]]
+ */
+ protected def verifyTlsRejectedByEkuValidation(fromSystem: ActorSystem,
toPath: ActorPath): Unit = {
+ val probe = TestProbe()(fromSystem)
+ fromSystem.actorSelection(toPath).tell(Identify(toPath.name), probe.ref)
+ val identity = probe.expectMsgType[ActorIdentity]
+ identity.ref shouldBe None
+ }
+
+ /**
+ * Pre-JDK 25 verification: The TLS handshake with invalid certificates fails
+ * mid-exchange, causing the Identify message to be lost in transit. No
ActorIdentity
+ * response arrives at all, which we verify by asserting no message is
received.
+ */
+ protected def verifyTlsFailsDuringHandshake(fromSystem: ActorSystem, toPath:
ActorPath): Unit = {
+ val probe = TestProbe()(fromSystem)
+ fromSystem.actorSelection(toPath).tell(Identify(toPath.name), probe.ref)
+ probe.expectNoMessage()
+ }
+
def buildRemoteWithEchoActor(id: String): (RemoteSystem, ActorPath) = {
val remoteSys = new RemoteSystem(s"system$id", extraConfig,
newRemoteSystem, address)
systemsToTerminate :+= remoteSys.actorSystem
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index befee24c08..a36f9beab4 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -42,7 +42,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
// 100K-element tests need extra headroom, especially on JDK 25+ where
// ForkJoinPool scheduling changes slow down highly-parallel workloads
(#2573)
override implicit val patience: PatienceConfig =
- PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+ PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index ce7d19c48f..04df0b4e63 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -38,7 +38,7 @@ class HubSpec extends StreamSpec {
// Long-stream tests (20K elements) need extra headroom on JDK 25+
// where ForkJoinPool scheduling changes cause slower throughput (#2573)
override implicit val patience: PatienceConfig =
- PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+ PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
"MergeHub" must {
diff --git
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 7ccd38e28e..3a84b3c47d 100644
---
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -18,7 +18,7 @@
package org.apache.pekko.stream
import java.time.Instant
-import java.util.concurrent.Executors
+import java.util.concurrent.{ ExecutorService, Executors }
import scala.annotation.nowarn
import scala.concurrent.{ blocking, ExecutionContext, Future }
@@ -107,18 +107,22 @@ class MapAsyncPartitionedSpec
import MapAsyncPartitionedSpec.TestData._
- // Property-based tests with blocking operations need extra headroom,
- // especially on JDK 25+ with ForkJoinPool scheduling changes (#2573)
+ // These suites materialize many short-lived streams. On busy CI nodes,
+ // JDK 25 makes the 1000-sample property checks noticeably more expensive
(#2573).
override implicit def patienceConfig: PatienceConfig = PatienceConfig(
- timeout = 15.seconds,
+ timeout = 60.seconds,
interval = 100.millis)
+ private val heavyPropertyChecks = minSuccessful(100)
+
private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty,
"test-system")
- private implicit val ec: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
+ private val executor: ExecutorService = Executors.newCachedThreadPool()
+ private implicit val ec: ExecutionContext =
ExecutionContext.fromExecutor(executor)
override protected def afterAll(): Unit = {
system.terminate()
system.whenTerminated.futureValue
+ executor.shutdown()
super.afterAll()
}
@@ -149,7 +153,7 @@ class MapAsyncPartitionedSpec
}
it should "process elements in parallel preserving order in partition" in {
- forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements:
Seq[TestKeyValue]) =>
+ forAll(heavyPropertyChecks) { (parallelism: Parallelism, elements:
Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation)
@@ -164,7 +168,7 @@ class MapAsyncPartitionedSpec
}
it should "process elements in sequence preserving order in partition" in {
- forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
+ forAll(heavyPropertyChecks) { (elements: Seq[TestKeyValue]) =>
val result =
Source
.fromIterator(() => elements.iterator)
@@ -301,7 +305,7 @@ class MapAsyncPartitionedSpec
}
it should "process elements in parallel preserving order in partition" in {
- forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements:
Seq[TestKeyValue]) =>
+ forAll(heavyPropertyChecks) { (parallelism: Parallelism, elements:
Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
.mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation)
@@ -316,7 +320,7 @@ class MapAsyncPartitionedSpec
}
it should "process elements in sequence preserving order in partition" in {
- forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
+ forAll(heavyPropertyChecks) { (elements: Seq[TestKeyValue]) =>
val result =
Source
.fromIterator(() => elements.iterator)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]