This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0600ac00e9b KAFKA-16065: close DelayedFuturePurgatory in
DelayedOperationTest (#15090)
0600ac00e9b is described below
commit 0600ac00e9b7a96c67d6a954123eae4b7ec6f392
Author: Luke Chen <[email protected]>
AuthorDate: Sat Dec 30 02:27:45 2023 +0900
KAFKA-16065: close DelayedFuturePurgatory in DelayedOperationTest (#15090)
Reviewers: Divij Vaidya <[email protected]>
---
.../unit/kafka/server/DelayedOperationTest.scala | 117 +++++++++++----------
1 file changed, 61 insertions(+), 56 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 73b70e3298e..f05f6a0277a 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -120,62 +120,67 @@ class DelayedOperationTest {
def testDelayedFuture(): Unit = {
val purgatoryName = "testDelayedFuture"
val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
- val result = new AtomicInteger()
-
- def hasExecutorThread: Boolean =
Thread.getAllStackTraces.keySet.asScala.map(_.getName)
- .exists(_.contains(s"DelayedExecutor-$purgatoryName"))
- def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
-
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
-
- assertFalse(hasExecutorThread, "Unnecessary thread created")
-
- // Two completed futures: callback should be executed immediately on the
same thread
- val futures1 =
List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
- CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
- val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () =>
updateResult(futures1))
- assertTrue(r1.isCompleted, "r1 not completed")
- assertEquals(21, result.get())
- assertFalse(hasExecutorThread, "Unnecessary thread created")
-
- // Two delayed futures: callback should wait for both to complete
- result.set(-1)
- val futures2 = List(new CompletableFuture[Integer], new
CompletableFuture[Integer])
- val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () =>
updateResult(futures2))
- assertFalse(r2.isCompleted, "r2 should be incomplete")
- futures2.head.complete(20)
- assertFalse(r2.isCompleted)
- assertEquals(-1, result.get())
- futures2(1).complete(21)
- TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
- TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
- assertTrue(hasExecutorThread, "Thread not created for executing delayed
task")
-
- // One immediate and one delayed future: callback should wait for delayed
task to complete
- result.set(-1)
- val futures3 = List(new CompletableFuture[Integer],
CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
- val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () =>
updateResult(futures3))
- assertFalse(r3.isCompleted, "r3 should be incomplete")
- assertEquals(-1, result.get())
- futures3.head.complete(30)
- TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
- TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
-
-
- // One future doesn't complete within timeout. Should expire and invoke
callback after timeout.
- result.set(-1)
- val start = Time.SYSTEM.hiResClockMs
- val expirationMs = 2000L
- val futures4 = List(new CompletableFuture[Integer], new
CompletableFuture[Integer])
- val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4,
() => updateResult(futures4))
- futures4.head.complete(40)
- TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not expired")
- assertTrue(r4.isCompleted, "r4 not completed after timeout")
- val elapsed = Time.SYSTEM.hiResClockMs - start
- assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed should
at least $expirationMs")
- assertEquals(40, futures4.head.get)
- assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
- assertThrows(classOf[ExecutionException], () =>
futures4(1).get).getCause.getClass)
- assertEquals(40, result.get())
+ try {
+ val result = new AtomicInteger()
+
+ def hasExecutorThread: Boolean =
Thread.getAllStackTraces.keySet.asScala.map(_.getName)
+ .exists(_.contains(s"DelayedExecutor-$purgatoryName"))
+
+ def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
+
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
+
+ assertFalse(hasExecutorThread, "Unnecessary thread created")
+
+ // Two completed futures: callback should be executed immediately on the
same thread
+ val futures1 =
List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
+ CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
+ val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, ()
=> updateResult(futures1))
+ assertTrue(r1.isCompleted, "r1 not completed")
+ assertEquals(21, result.get())
+ assertFalse(hasExecutorThread, "Unnecessary thread created")
+
+ // Two delayed futures: callback should wait for both to complete
+ result.set(-1)
+ val futures2 = List(new CompletableFuture[Integer], new
CompletableFuture[Integer])
+ val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, ()
=> updateResult(futures2))
+ assertFalse(r2.isCompleted, "r2 should be incomplete")
+ futures2.head.complete(20)
+ assertFalse(r2.isCompleted)
+ assertEquals(-1, result.get())
+ futures2(1).complete(21)
+ TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
+ TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
+ assertTrue(hasExecutorThread, "Thread not created for executing delayed
task")
+
+ // One immediate and one delayed future: callback should wait for
delayed task to complete
+ result.set(-1)
+ val futures3 = List(new CompletableFuture[Integer],
CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
+ val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, ()
=> updateResult(futures3))
+ assertFalse(r3.isCompleted, "r3 should be incomplete")
+ assertEquals(-1, result.get())
+ futures3.head.complete(30)
+ TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
+ TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
+
+
+ // One future doesn't complete within timeout. Should expire and invoke
callback after timeout.
+ result.set(-1)
+ val start = Time.SYSTEM.hiResClockMs
+ val expirationMs = 2000L
+ val futures4 = List(new CompletableFuture[Integer], new
CompletableFuture[Integer])
+ val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4,
() => updateResult(futures4))
+ futures4.head.complete(40)
+ TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not
expired")
+ assertTrue(r4.isCompleted, "r4 not completed after timeout")
+ val elapsed = Time.SYSTEM.hiResClockMs - start
+ assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed
should at least $expirationMs")
+ assertEquals(40, futures4.head.get)
+ assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
+ assertThrows(classOf[ExecutionException], () =>
futures4(1).get).getCause.getClass)
+ assertEquals(40, result.get())
+ } finally {
+ purgatory.shutdown()
+ }
}
@Test