This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56591d2d078 KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory
to server module (#19390)
56591d2d078 is described below
commit 56591d2d078701d6901491684d72cc402bc3f714
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Apr 9 17:52:56 2025 +0800
KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory to server module
(#19390)
Rewrite these classes in Java and move them to the server module
Reviewers: Mickael Maison <[email protected]>
---
core/src/main/scala/kafka/server/AclApis.scala | 17 +--
.../main/scala/kafka/server/DelayedFuture.scala | 107 -------------------
.../kafka/server/DelayedFutureTest.scala | 96 -----------------
.../kafka/server/purgatory/DelayedFuture.java | 83 +++++++++++++++
.../server/purgatory/DelayedFuturePurgatory.java | 69 ++++++++++++
.../kafka/server/purgatory/DelayedFutureTest.java | 117 +++++++++++++++++++++
6 files changed, 279 insertions(+), 210 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AclApis.scala
b/core/src/main/scala/kafka/server/AclApis.scala
index 4bf57f8a5cc..2edafea2317 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -32,9 +32,11 @@ import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.purgatory.DelayedFuturePurgatory
import java.util
import java.util.concurrent.CompletableFuture
+import java.util.stream.Collectors
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -50,7 +52,7 @@ class AclApis(authHelper: AuthHelper,
config: KafkaConfig) extends Logging {
this.logIdent = "[AclApis-%s-%s] ".format(role, config.nodeId)
private val alterAclsPurgatory =
- new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId =
config.nodeId)
+ new DelayedFuturePurgatory("AlterAcls", config.nodeId)
def isClosed: Boolean = alterAclsPurgatory.isShutdown
@@ -107,11 +109,11 @@ class AclApis(authHelper: AuthHelper,
}
val future = new CompletableFuture[util.List[AclCreationResult]]()
- val createResults = auth.createAcls(request.context,
validBindings.asJava).asScala.map(_.toCompletableFuture)
+ val createResults = auth.createAcls(request.context,
validBindings.asJava).stream().map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
val aclCreationResults = allBindings.map { acl =>
- val result = errorResults.getOrElse(acl,
createResults(validBindings.indexOf(acl)).get)
+ val result = errorResults.getOrElse(acl,
createResults.get(validBindings.indexOf(acl)).get)
val creationResult = new AclCreationResult()
result.exception.toScala.foreach { throwable =>
val apiError = ApiError.fromThrowable(throwable)
@@ -123,7 +125,7 @@ class AclApis(authHelper: AuthHelper,
}
future.complete(aclCreationResults.asJava)
}
- alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs,
createResults, sendResponseCallback)
+ alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs,
createResults, () => sendResponseCallback())
future.thenApply[Unit] { aclCreationResults =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -147,14 +149,15 @@ class AclApis(authHelper: AuthHelper,
val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
val deleteResults = auth.deleteAcls(request.context,
deleteAclsRequest.filters)
- .asScala.map(_.toCompletableFuture).toList
+ .stream().map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
- val filterResults =
deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
+ val filterResults: util.List[DeleteAclsFilterResult] =
deleteResults.stream().map(_.get)
+ .map(DeleteAclsResponse.filterResult).collect(Collectors.toList())
future.complete(filterResults)
}
- alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs,
deleteResults, sendResponseCallback)
+ alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs,
deleteResults, () => sendResponseCallback())
future.thenApply[Unit] { filterResults =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DeleteAclsResponse(
diff --git a/core/src/main/scala/kafka/server/DelayedFuture.scala
b/core/src/main/scala/kafka/server/DelayedFuture.scala
deleted file mode 100644
index a24bc387089..00000000000
--- a/core/src/main/scala/kafka/server/DelayedFuture.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.Logging
-
-import java.util
-import java.util.concurrent._
-import java.util.function.BiConsumer
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.utils.KafkaThread
-import org.apache.kafka.server.purgatory.{DelayedOperation,
DelayedOperationKey, DelayedOperationPurgatory}
-
-import scala.collection.Seq
-
-/**
- * A delayed operation using CompletionFutures that can be created by
KafkaApis and watched
- * in a DelayedFuturePurgatory purgatory. This is used for ACL updates using
async Authorizers.
- */
-class DelayedFuture[T](timeoutMs: Long,
- futures: Seq[CompletableFuture[T]],
- responseCallback: () => Unit)
- extends DelayedOperation(timeoutMs) with Logging {
-
- /**
- * The operation can be completed if all the futures have completed
successfully
- * or failed with exceptions.
- */
- override def tryComplete() : Boolean = {
- trace(s"Trying to complete operation for ${futures.size} futures")
-
- val pending = futures.count(future => !future.isDone)
- if (pending == 0) {
- trace("All futures have been completed or have errors, completing the
delayed operation")
- forceComplete()
- } else {
- trace(s"$pending future still pending, not completing the delayed
operation")
- false
- }
- }
-
- /**
- * Timeout any pending futures and invoke responseCallback. This is invoked
when all
- * futures have completed or the operation has timed out.
- */
- override def onComplete(): Unit = {
- val pendingFutures = futures.filterNot(_.isDone)
- trace(s"Completing operation for ${futures.size} futures, expired
${pendingFutures.size}")
- pendingFutures.foreach(_.completeExceptionally(new
TimeoutException(s"Request has been timed out after $timeoutMs ms")))
- responseCallback.apply()
- }
-
- /**
- * This is invoked after onComplete(), so no actions required.
- */
- override def onExpiration(): Unit = {
- }
-}
-
-class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) {
- private val purgatory = new
DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
- private val executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue[Runnable](),
- new ThreadFactory {
- override def newThread(r: Runnable): Thread = new
KafkaThread(s"DelayedExecutor-$purgatoryName", r, true)
- })
- private val purgatoryKey = new DelayedOperationKey() {
- override def keyLabel(): String = "delayed-future-key"
- }
-
- def tryCompleteElseWatch[T](timeoutMs: Long,
- futures: Seq[CompletableFuture[T]],
- responseCallback: () => Unit): DelayedFuture[T]
= {
- val delayedFuture = new DelayedFuture[T](timeoutMs, futures,
responseCallback)
- val done = purgatory.tryCompleteElseWatch(delayedFuture,
util.Collections.singletonList(purgatoryKey))
- if (!done) {
- val callbackAction = new BiConsumer[Void, Throwable]() {
- override def accept(result: Void, exception: Throwable): Unit =
delayedFuture.forceComplete()
- }
- CompletableFuture.allOf(futures.toArray:
_*).whenCompleteAsync(callbackAction, executor)
- }
- delayedFuture
- }
-
- def shutdown(): Unit = {
- executor.shutdownNow()
- executor.awaitTermination(60, TimeUnit.SECONDS)
- purgatory.shutdown()
- }
-
- def isShutdown: Boolean = executor.isShutdown
-}
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala
deleted file mode 100644
index e9313159493..00000000000
--- a/core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package integration.kafka.server
-
-import kafka.server.DelayedFuturePurgatory
-import kafka.utils.TestUtils
-import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.concurrent.{CompletableFuture, ExecutionException}
-import java.util.concurrent.atomic.AtomicInteger
-import scala.jdk.CollectionConverters.CollectionHasAsScala
-
-class DelayedFutureTest {
-
- @Test
- def testDelayedFuture(): Unit = {
- val purgatoryName = "testDelayedFuture"
- val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
- try {
- val result = new AtomicInteger()
-
- def hasExecutorThread: Boolean =
Thread.getAllStackTraces.keySet.asScala.map(_.getName)
- .exists(_.contains(s"DelayedExecutor-$purgatoryName"))
-
- def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
-
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
-
- assertFalse(hasExecutorThread, "Unnecessary thread created")
-
- // Two completed futures: callback should be executed immediately on the
same thread
- val futures1 =
List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
- CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
- val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, ()
=> updateResult(futures1))
- assertTrue(r1.isCompleted, "r1 not completed")
- assertEquals(21, result.get())
- assertFalse(hasExecutorThread, "Unnecessary thread created")
-
- // Two delayed futures: callback should wait for both to complete
- result.set(-1)
- val futures2 = List(new CompletableFuture[Integer], new
CompletableFuture[Integer])
- val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, ()
=> updateResult(futures2))
- assertFalse(r2.isCompleted, "r2 should be incomplete")
- futures2.head.complete(20)
- assertFalse(r2.isCompleted)
- assertEquals(-1, result.get())
- futures2(1).complete(21)
- TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
- TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
- assertTrue(hasExecutorThread, "Thread not created for executing delayed
task")
-
- // One immediate and one delayed future: callback should wait for
delayed task to complete
- result.set(-1)
- val futures3 = List(new CompletableFuture[Integer],
CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
- val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, ()
=> updateResult(futures3))
- assertFalse(r3.isCompleted, "r3 should be incomplete")
- assertEquals(-1, result.get())
- futures3.head.complete(30)
- TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
- TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
-
- // One future doesn't complete within timeout. Should expire and invoke
callback after timeout.
- result.set(-1)
- val start = Time.SYSTEM.hiResClockMs
- val expirationMs = 2000L
- val futures4 = List(new CompletableFuture[Integer], new
CompletableFuture[Integer])
- val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4,
() => updateResult(futures4))
- futures4.head.complete(40)
- TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not
expired")
- assertTrue(r4.isCompleted, "r4 not completed after timeout")
- val elapsed = Time.SYSTEM.hiResClockMs - start
- assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed
should at least $expirationMs")
- assertEquals(40, futures4.head.get)
- assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
- assertThrows(classOf[ExecutionException], () =>
futures4(1).get).getCause.getClass)
- assertEquals(40, result.get())
- } finally {
- purgatory.shutdown()
- }
- }
-}
diff --git
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuture.java
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuture.java
new file mode 100644
index 00000000000..b9fde063f65
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuture.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A delayed operation using CompletionFutures that can be created by
KafkaApis and watched
+ * in a DelayedFuturePurgatory purgatory. This is used for ACL updates using
async Authorizers.
+ */
+public class DelayedFuture<T> extends DelayedOperation {
+
+ private final Logger log = new
LogContext().logger(DelayedFuture.class.getName());
+ private final List<CompletableFuture<T>> futures;
+ private final Runnable responseCallback;
+ private final long timeoutMs;
+
+ public DelayedFuture(long timeoutMs, List<CompletableFuture<T>> futures,
Runnable responseCallback) {
+ super(timeoutMs);
+ this.timeoutMs = timeoutMs;
+ this.futures = futures;
+ this.responseCallback = responseCallback;
+ }
+
+ /**
+ * The operation can be completed if all the futures have completed
successfully
+ * or failed with exceptions.
+ */
+ @Override
+ public boolean tryComplete() {
+ log.trace("Trying to complete operation for {} futures",
futures.size());
+
+ long pending = futures.stream().filter(future ->
!future.isDone()).count();
+ if (pending == 0) {
+ log.trace("All futures have been completed or have errors,
completing the delayed operation");
+ return forceComplete();
+ } else {
+ log.trace("{} future still pending, not completing the delayed
operation", pending);
+ return false;
+ }
+ }
+
+ /**
+ * Timeout any pending futures and invoke responseCallback. This is
invoked when all
+ * futures have completed or the operation has timed out.
+ */
+ @Override
+ public void onComplete() {
+ List<CompletableFuture<T>> pendingFutures =
futures.stream().filter(future -> !future.isDone()).toList();
+ log.trace("Completing operation for {} futures, expired {}",
futures.size(), pendingFutures.size());
+ pendingFutures.forEach(future -> future.completeExceptionally(new
TimeoutException("Request has been timed out after " + timeoutMs + " ms")));
+ responseCallback.run();
+ }
+
+ /**
+ * This is invoked after onComplete(), so no actions required.
+ */
+ @Override
+ public void onExpiration() {
+ // This is invoked after onComplete(), so no actions required.
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuturePurgatory.java
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuturePurgatory.java
new file mode 100644
index 00000000000..637fdb1aab6
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuturePurgatory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.utils.KafkaThread;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+public class DelayedFuturePurgatory {
+ private final DelayedOperationPurgatory<DelayedFuture<?>> purgatory;
+ private final ThreadPoolExecutor executor;
+ private final DelayedOperationKey purgatoryKey;
+
+ public DelayedFuturePurgatory(String purgatoryName, int brokerId) {
+ this.purgatory = new DelayedOperationPurgatory<>(purgatoryName,
brokerId);
+ this.executor = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ r -> new KafkaThread("DelayedExecutor-" + purgatoryName, r, true));
+ this.purgatoryKey = () -> "delayed-future-key";
+ }
+
+ public <T> DelayedFuture<T> tryCompleteElseWatch(
+ long timeoutMs,
+ List<CompletableFuture<T>> futures,
+ Runnable responseCallback
+ ) {
+ DelayedFuture<T> delayedFuture = new DelayedFuture<>(timeoutMs,
futures, responseCallback);
+ boolean done = purgatory.tryCompleteElseWatch(delayedFuture,
List.of(purgatoryKey));
+ if (!done) {
+ BiConsumer<Void, Throwable> callbackAction = (result, exception)
-> delayedFuture.forceComplete();
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture<?>[0])).whenCompleteAsync(callbackAction, executor);
+ }
+ return delayedFuture;
+ }
+
+ public void shutdown() throws Exception {
+ executor.shutdownNow();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ purgatory.shutdown();
+ }
+
+ public boolean isShutdown() {
+ return executor.isShutdown();
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
new file mode 100644
index 00000000000..80efa9930c4
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DelayedFutureTest {
+
+ @Test
+ void testDelayedFuture() throws Exception {
+ String purgatoryName = "testDelayedFuture";
+ DelayedFuturePurgatory purgatory = new
DelayedFuturePurgatory(purgatoryName, 0);
+ try {
+ AtomicInteger result = new AtomicInteger();
+
+ Supplier<Boolean> hasExecutorThread = () ->
Thread.getAllStackTraces().keySet().stream()
+ .map(Thread::getName)
+ .anyMatch(name -> name.contains("DelayedExecutor-" +
purgatoryName));
+
+ Function<List<CompletableFuture<Integer>>, Void> updateResult =
futures -> {
+ result.set(futures.stream()
+
.filter(Predicate.not(CompletableFuture::isCompletedExceptionally))
+ .mapToInt(future -> assertDoesNotThrow(() -> future.get()))
+ .sum());
+ return null;
+ };
+
+ assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
+
+ // Two completed futures: callback should be executed immediately
on the same thread
+ List<CompletableFuture<Integer>> futures1 = List.of(
+ CompletableFuture.completedFuture(10),
+ CompletableFuture.completedFuture(11)
+ );
+ DelayedFuture<Integer> r1 =
purgatory.tryCompleteElseWatch(100000L, futures1, () ->
updateResult.apply(futures1));
+ assertTrue(r1.isCompleted(), "r1 not completed");
+ assertEquals(21, result.get());
+ assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
+
+ // Two delayed futures: callback should wait for both to complete
+ result.set(-1);
+ List<CompletableFuture<Integer>> futures2 = List.of(new
CompletableFuture<>(), new CompletableFuture<>());
+ DelayedFuture<Integer> r2 =
purgatory.tryCompleteElseWatch(100000L, futures2, () ->
updateResult.apply(futures2));
+ assertFalse(r2.isCompleted(), "r2 should be incomplete");
+ futures2.get(0).complete(20);
+ assertFalse(r2.isCompleted());
+ assertEquals(-1, result.get());
+ futures2.get(1).complete(21);
+ TestUtils.waitForCondition(r2::isCompleted, "r2 not completed");
+ TestUtils.waitForCondition(() -> result.get() == 41, "callback not
invoked");
+ assertTrue(hasExecutorThread.get(), "Thread not created for
executing delayed task");
+
+ // One immediate and one delayed future: callback should wait for
delayed task to complete
+ result.set(-1);
+ List<CompletableFuture<Integer>> futures3 = List.of(
+ new CompletableFuture<>(),
+ CompletableFuture.completedFuture(31)
+ );
+ DelayedFuture<Integer> r3 =
purgatory.tryCompleteElseWatch(100000L, futures3, () ->
updateResult.apply(futures3));
+ assertFalse(r3.isCompleted(), "r3 should be incomplete");
+ assertEquals(-1, result.get());
+ futures3.get(0).complete(30);
+ TestUtils.waitForCondition(r3::isCompleted, "r3 not completed");
+ TestUtils.waitForCondition(() -> result.get() == 61, "callback not
invoked");
+
+ // One future doesn't complete within timeout. Should expire and
invoke callback after timeout.
+ result.set(-1);
+ long start = Time.SYSTEM.hiResClockMs();
+ long expirationMs = 2000L;
+ List<CompletableFuture<Integer>> futures4 = List.of(new
CompletableFuture<>(), new CompletableFuture<>());
+ DelayedFuture<Integer> r4 =
purgatory.tryCompleteElseWatch(expirationMs, futures4, () ->
updateResult.apply(futures4));
+ futures4.get(0).complete(40);
+ TestUtils.waitForCondition(() -> futures4.get(1).isDone(), "r4
futures not expired");
+ assertTrue(r4.isCompleted(), "r4 not completed after timeout");
+ long elapsed = Time.SYSTEM.hiResClockMs() - start;
+ assertTrue(elapsed >= expirationMs, "Time for expiration " +
elapsed + " should at least " + expirationMs);
+ assertEquals(40, futures4.get(0).get());
+ Exception exception = assertThrows(ExecutionException.class, () ->
futures4.get(1).get());
+ assertEquals(TimeoutException.class,
exception.getCause().getClass());
+ assertEquals(40, result.get());
+ } finally {
+ purgatory.shutdown();
+ }
+ }
+}