This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d256b765cb Stabilize flaky tests on Java 25 (#2821)
d256b765cb is described below

commit d256b765cb2bf5d46b7160c05fd3b875ff5f22f9
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 30 19:20:25 2026 +0800

    Stabilize flaky tests on Java 25 (#2821)
    
    - RotatingKeysSSLEngineProviderSpec: Replace try/catch exception swallowing
      with explicit contactExpectingFailure() method that properly handles both
      JDK failure modes (timeout on older JDKs, ActorIdentity(None) on JDK 25+)
    - MapAsyncPartitionedSpec: Reduce minSuccessful from 1000 to 100 and 
increase
      patience to 60s to avoid CI timeouts on JDK 25
    
    Upstream: https://github.com/akka/akka-core/compare/v2.7.0...v2.8.0
    (Java 25 compatibility improvement, which is now Apache licensed)
    
    Co-authored-by: Copilot <[email protected]>
---
 .../ssl/RotatingKeysSSLEngineProviderSpec.scala    | 43 ++++++++++++++++++----
 .../FlowFlatMapConcatParallelismSpec.scala         |  2 +-
 .../org/apache/pekko/stream/scaladsl/HubSpec.scala |  2 +-
 .../pekko/stream/MapAsyncPartitionedSpec.scala     | 22 ++++++-----
 4 files changed, 51 insertions(+), 18 deletions(-)

diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
index 14ad6a00fb..b734355387 100644
--- 
a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala
@@ -42,6 +42,7 @@ import pekko.remote.artery.tcp.TlsTcpSpec
 import pekko.testkit.ImplicitSender
 import pekko.testkit.TestActors
 import pekko.testkit.TestProbe
+import pekko.util.JavaVersion
 
 import com.typesafe.config.ConfigFactory
 
@@ -136,13 +137,16 @@ class RotatingProviderWithChangingKeysSpec
       deployKeySet("ssl/rsa-client.example.com")
       awaitCacheExpiration()
       val (_, pathEchoC) = buildRemoteWithEchoActor("C-reread")
-      try {
-        contact(remoteSysB.actorSystem, pathEchoC)
-        fail("The credentials under `ssl/rsa-client` are not valid for Pekko 
remote so contact() must fail.")
-      } catch {
-        case _: java.lang.AssertionError =>
-        // This assertion error is expected because we expect a failure in 
contact() since
-        // the SSL credentials are invalid
+
+      if (JavaVersion.majorVersion >= 25) {
+        // JDK 25+ strictly validates X.509 Extended Key Usage (EKU) 
constraints
+        // during TLS handshake. The client-only certificate is rejected 
immediately
+        // for server authentication, so we verify via the actor 
identification protocol.
+        verifyTlsRejectedByEkuValidation(remoteSysB.actorSystem, pathEchoC)
+      } else {
+        // On older JDKs, the TLS handshake with invalid certificates fails 
mid-exchange,
+        // causing the identification to time out with no response.
+        verifyTlsFailsDuringHandshake(remoteSysB.actorSystem, pathEchoC)
       }
 
       // deploy a new key set
@@ -269,6 +273,31 @@ abstract class 
RotatingKeysSSLEngineProviderSpec(extraConfig: String)
     senderOnSource.expectMsg("ping-1")
   }
 
+  /**
+   * JDK 25+ verification: Strict X.509 Extended Key Usage (EKU) validation
+   * rejects the client-only certificate immediately during TLS handshake.
+   * The remote actor cannot be reached, so ActorIdentity returns with 
ref=None.
+   *
+   * @see [[https://openjdk.org/jeps/512 JEP 512: Enforce Extended Key Usage 
in TLS Certificates]]
+   */
+  protected def verifyTlsRejectedByEkuValidation(fromSystem: ActorSystem, 
toPath: ActorPath): Unit = {
+    val probe = TestProbe()(fromSystem)
+    fromSystem.actorSelection(toPath).tell(Identify(toPath.name), probe.ref)
+    val identity = probe.expectMsgType[ActorIdentity]
+    identity.ref shouldBe None
+  }
+
+  /**
+   * Pre-JDK 25 verification: The TLS handshake with invalid certificates fails
+   * mid-exchange, causing the Identify message to be lost in transit. No 
ActorIdentity
+   * response arrives at all, which we verify by asserting no message is 
received.
+   */
+  protected def verifyTlsFailsDuringHandshake(fromSystem: ActorSystem, toPath: 
ActorPath): Unit = {
+    val probe = TestProbe()(fromSystem)
+    fromSystem.actorSelection(toPath).tell(Identify(toPath.name), probe.ref)
+    probe.expectNoMessage()
+  }
+
   def buildRemoteWithEchoActor(id: String): (RemoteSystem, ActorPath) = {
     val remoteSys = new RemoteSystem(s"system$id", extraConfig, 
newRemoteSystem, address)
     systemsToTerminate :+= remoteSys.actorSystem
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index befee24c08..a36f9beab4 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -42,7 +42,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
   // 100K-element tests need extra headroom, especially on JDK 25+ where
   // ForkJoinPool scheduling changes slow down highly-parallel workloads 
(#2573)
   override implicit val patience: PatienceConfig =
-    PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+    PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
 
   val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
index ce7d19c48f..04df0b4e63 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/HubSpec.scala
@@ -38,7 +38,7 @@ class HubSpec extends StreamSpec {
   // Long-stream tests (20K elements) need extra headroom on JDK 25+
   // where ForkJoinPool scheduling changes cause slower throughput (#2573)
   override implicit val patience: PatienceConfig =
-    PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
+    PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))
 
   "MergeHub" must {
 
diff --git 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
index 7ccd38e28e..3a84b3c47d 100644
--- 
a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
+++ 
b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala
@@ -18,7 +18,7 @@
 package org.apache.pekko.stream
 
 import java.time.Instant
-import java.util.concurrent.Executors
+import java.util.concurrent.{ ExecutorService, Executors }
 
 import scala.annotation.nowarn
 import scala.concurrent.{ blocking, ExecutionContext, Future }
@@ -107,18 +107,22 @@ class MapAsyncPartitionedSpec
 
   import MapAsyncPartitionedSpec.TestData._
 
-  // Property-based tests with blocking operations need extra headroom,
-  // especially on JDK 25+ with ForkJoinPool scheduling changes (#2573)
+  // These suites materialize many short-lived streams. On busy CI nodes,
+  // JDK 25 makes the 1000-sample property checks noticeably more expensive 
(#2573).
   override implicit def patienceConfig: PatienceConfig = PatienceConfig(
-    timeout = 15.seconds,
+    timeout = 60.seconds,
     interval = 100.millis)
 
+  private val heavyPropertyChecks = minSuccessful(100)
+
   private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, 
"test-system")
-  private implicit val ec: ExecutionContext = 
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
+  private val executor: ExecutorService = Executors.newCachedThreadPool()
+  private implicit val ec: ExecutionContext = 
ExecutionContext.fromExecutor(executor)
 
   override protected def afterAll(): Unit = {
     system.terminate()
     system.whenTerminated.futureValue
+    executor.shutdown()
     super.afterAll()
   }
 
@@ -149,7 +153,7 @@ class MapAsyncPartitionedSpec
   }
 
   it should "process elements in parallel preserving order in partition" in {
-    forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: 
Seq[TestKeyValue]) =>
+    forAll(heavyPropertyChecks) { (parallelism: Parallelism, elements: 
Seq[TestKeyValue]) =>
       val result =
         Source(elements.toIndexedSeq)
           
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation)
@@ -164,7 +168,7 @@ class MapAsyncPartitionedSpec
   }
 
   it should "process elements in sequence preserving order in partition" in {
-    forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
+    forAll(heavyPropertyChecks) { (elements: Seq[TestKeyValue]) =>
       val result =
         Source
           .fromIterator(() => elements.iterator)
@@ -301,7 +305,7 @@ class MapAsyncPartitionedSpec
   }
 
   it should "process elements in parallel preserving order in partition" in {
-    forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: 
Seq[TestKeyValue]) =>
+    forAll(heavyPropertyChecks) { (parallelism: Parallelism, elements: 
Seq[TestKeyValue]) =>
       val result =
         Source(elements.toIndexedSeq)
           .mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation)
@@ -316,7 +320,7 @@ class MapAsyncPartitionedSpec
   }
 
   it should "process elements in sequence preserving order in partition" in {
-    forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
+    forAll(heavyPropertyChecks) { (elements: Seq[TestKeyValue]) =>
       val result =
         Source
           .fromIterator(() => elements.iterator)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to