This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0600ac00e9b KAFKA-16065: close DelayedFuturePurgatory in 
DelayedOperationTest (#15090)
0600ac00e9b is described below

commit 0600ac00e9b7a96c67d6a954123eae4b7ec6f392
Author: Luke Chen <[email protected]>
AuthorDate: Sat Dec 30 02:27:45 2023 +0900

    KAFKA-16065: close DelayedFuturePurgatory in DelayedOperationTest (#15090)
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../unit/kafka/server/DelayedOperationTest.scala   | 117 +++++++++++----------
 1 file changed, 61 insertions(+), 56 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 73b70e3298e..f05f6a0277a 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -120,62 +120,67 @@ class DelayedOperationTest {
   def testDelayedFuture(): Unit = {
     val purgatoryName = "testDelayedFuture"
     val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
-    val result = new AtomicInteger()
-
-    def hasExecutorThread: Boolean = 
Thread.getAllStackTraces.keySet.asScala.map(_.getName)
-      .exists(_.contains(s"DelayedExecutor-$purgatoryName"))
-    def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
-      
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
-
-    assertFalse(hasExecutorThread, "Unnecessary thread created")
-
-    // Two completed futures: callback should be executed immediately on the 
same thread
-    val futures1 = 
List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
-      CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
-    val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () => 
updateResult(futures1))
-    assertTrue(r1.isCompleted, "r1 not completed")
-    assertEquals(21, result.get())
-    assertFalse(hasExecutorThread, "Unnecessary thread created")
-
-    // Two delayed futures: callback should wait for both to complete
-    result.set(-1)
-    val futures2 = List(new CompletableFuture[Integer], new 
CompletableFuture[Integer])
-    val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () => 
updateResult(futures2))
-    assertFalse(r2.isCompleted, "r2 should be incomplete")
-    futures2.head.complete(20)
-    assertFalse(r2.isCompleted)
-    assertEquals(-1, result.get())
-    futures2(1).complete(21)
-    TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
-    TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
-    assertTrue(hasExecutorThread, "Thread not created for executing delayed 
task")
-
-    // One immediate and one delayed future: callback should wait for delayed 
task to complete
-    result.set(-1)
-    val futures3 = List(new CompletableFuture[Integer], 
CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
-    val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () => 
updateResult(futures3))
-    assertFalse(r3.isCompleted, "r3 should be incomplete")
-    assertEquals(-1, result.get())
-    futures3.head.complete(30)
-    TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
-    TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
-
-
-    // One future doesn't complete within timeout. Should expire and invoke 
callback after timeout.
-    result.set(-1)
-    val start = Time.SYSTEM.hiResClockMs
-    val expirationMs = 2000L
-    val futures4 = List(new CompletableFuture[Integer], new 
CompletableFuture[Integer])
-    val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4, 
() => updateResult(futures4))
-    futures4.head.complete(40)
-    TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not expired")
-    assertTrue(r4.isCompleted, "r4 not completed after timeout")
-    val elapsed = Time.SYSTEM.hiResClockMs - start
-    assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed should 
at least $expirationMs")
-    assertEquals(40, futures4.head.get)
-    assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
-      assertThrows(classOf[ExecutionException], () => 
futures4(1).get).getCause.getClass)
-    assertEquals(40, result.get())
+    try {
+      val result = new AtomicInteger()
+
+      def hasExecutorThread: Boolean = 
Thread.getAllStackTraces.keySet.asScala.map(_.getName)
+        .exists(_.contains(s"DelayedExecutor-$purgatoryName"))
+
+      def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
+        
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
+
+      assertFalse(hasExecutorThread, "Unnecessary thread created")
+
+      // Two completed futures: callback should be executed immediately on the 
same thread
+      val futures1 = 
List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
+        CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
+      val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () 
=> updateResult(futures1))
+      assertTrue(r1.isCompleted, "r1 not completed")
+      assertEquals(21, result.get())
+      assertFalse(hasExecutorThread, "Unnecessary thread created")
+
+      // Two delayed futures: callback should wait for both to complete
+      result.set(-1)
+      val futures2 = List(new CompletableFuture[Integer], new 
CompletableFuture[Integer])
+      val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () 
=> updateResult(futures2))
+      assertFalse(r2.isCompleted, "r2 should be incomplete")
+      futures2.head.complete(20)
+      assertFalse(r2.isCompleted)
+      assertEquals(-1, result.get())
+      futures2(1).complete(21)
+      TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
+      TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
+      assertTrue(hasExecutorThread, "Thread not created for executing delayed 
task")
+
+      // One immediate and one delayed future: callback should wait for 
delayed task to complete
+      result.set(-1)
+      val futures3 = List(new CompletableFuture[Integer], 
CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
+      val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () 
=> updateResult(futures3))
+      assertFalse(r3.isCompleted, "r3 should be incomplete")
+      assertEquals(-1, result.get())
+      futures3.head.complete(30)
+      TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
+      TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
+
+
+      // One future doesn't complete within timeout. Should expire and invoke 
callback after timeout.
+      result.set(-1)
+      val start = Time.SYSTEM.hiResClockMs
+      val expirationMs = 2000L
+      val futures4 = List(new CompletableFuture[Integer], new 
CompletableFuture[Integer])
+      val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4, 
() => updateResult(futures4))
+      futures4.head.complete(40)
+      TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not 
expired")
+      assertTrue(r4.isCompleted, "r4 not completed after timeout")
+      val elapsed = Time.SYSTEM.hiResClockMs - start
+      assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed 
should at least $expirationMs")
+      assertEquals(40, futures4.head.get)
+      assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
+        assertThrows(classOf[ExecutionException], () => 
futures4(1).get).getCause.getClass)
+      assertEquals(40, result.get())
+    } finally {
+      purgatory.shutdown()
+    }
   }
 
   @Test

Reply via email to