This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fcc541a71f test: clean up TLS edge streams after early cancellation 
(#3000)
fcc541a71f is described below

commit fcc541a71f90338b9c2f17df0b691913f0955831
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 28 19:15:57 2026 +0800

    test: clean up TLS edge streams after early cancellation (#3000)
    
    Motivation:
    JDK 25 nightly builds time out in repeated TlsGraphStageEdgeCasesSpec 
early-cancellation scenarios because earlier materializations can keep draining 
after the expected bytes have been collected.
    
    Modification:
    Materialize collectExactly with a KillSwitch and watch stream termination, 
then shut down and await the stream in finally after the expected bytes are 
collected.
    
    Result:
    Repeated TLS edge-case checks do not leave prior materializations running 
in the same actor system.
    
    Tests:
    - JDK 25 nightly-style virtualized stream-dispatcher flags: stream-tests / 
Test / testOnly org.apache.pekko.stream.io.TlsGraphStageEdgeCasesSpec
    - scalafmt --mode diff-ref=origin/main --quiet
    - scalafmt --list --mode diff-ref=origin/main
    - git diff --check
    
    References:
    Refs #2994
---
 .../stream/io/TlsGraphStageEdgeCasesSpec.scala     | 26 +++++++++++++++-------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
index 2e9e3216bb..718867a9b8 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala
@@ -68,14 +68,24 @@ class TlsGraphStageEdgeCasesSpec extends 
StreamSpec(TlsGraphStageEdgeCasesSpec.c
   private def collectExactly(
       stream: Source[SslTlsInbound, NotUsed],
       expectedBytes: Int,
-      timeout: FiniteDuration = 30.seconds): ByteString =
-    Await.result(
-      stream
-        .collect { case SessionBytes(_, b) => b }
-        .scan(ByteString.empty)(_ ++ _)
-        .dropWhile(_.size < expectedBytes)
-        .runWith(Sink.headOption),
-      timeout.dilated).getOrElse(ByteString.empty)
+      timeout: FiniteDuration = 30.seconds): ByteString = {
+    val ((killSwitch, streamDone), result) = stream
+      .viaMat(KillSwitches.single)(Keep.right)
+      .watchTermination(Keep.both)
+      .toMat(
+        Flow[SslTlsInbound]
+          .collect { case SessionBytes(_, b) => b }
+          .scan(ByteString.empty)(_ ++ _)
+          .dropWhile(_.size < expectedBytes)
+          .toMat(Sink.headOption)(Keep.right))(Keep.both)
+      .run()
+
+    try Await.result(result, timeout.dilated).getOrElse(ByteString.empty)
+    finally {
+      killSwitch.shutdown()
+      Await.result(streamDone, timeout.dilated)
+    }
+  }
 
   "TlsGraphStage" must {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to