This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56591d2d078 KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory 
to server module (#19390)
56591d2d078 is described below

commit 56591d2d078701d6901491684d72cc402bc3f714
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Apr 9 17:52:56 2025 +0800

    KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory to server module 
(#19390)
    
    Rewrite these classes in Java and move them to the server module
    
    Reviewers: Mickael Maison <[email protected]>
---
 core/src/main/scala/kafka/server/AclApis.scala     |  17 +--
 .../main/scala/kafka/server/DelayedFuture.scala    | 107 -------------------
 .../kafka/server/DelayedFutureTest.scala           |  96 -----------------
 .../kafka/server/purgatory/DelayedFuture.java      |  83 +++++++++++++++
 .../server/purgatory/DelayedFuturePurgatory.java   |  69 ++++++++++++
 .../kafka/server/purgatory/DelayedFutureTest.java  | 117 +++++++++++++++++++++
 6 files changed, 279 insertions(+), 210 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AclApis.scala 
b/core/src/main/scala/kafka/server/AclApis.scala
index 4bf57f8a5cc..2edafea2317 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -32,9 +32,11 @@ import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.purgatory.DelayedFuturePurgatory
 
 import java.util
 import java.util.concurrent.CompletableFuture
+import java.util.stream.Collectors
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -50,7 +52,7 @@ class AclApis(authHelper: AuthHelper,
               config: KafkaConfig) extends Logging {
   this.logIdent = "[AclApis-%s-%s] ".format(role, config.nodeId)
   private val alterAclsPurgatory =
-      new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = 
config.nodeId)
+      new DelayedFuturePurgatory("AlterAcls", config.nodeId)
 
   def isClosed: Boolean = alterAclsPurgatory.isShutdown
 
@@ -107,11 +109,11 @@ class AclApis(authHelper: AuthHelper,
         }
 
         val future = new CompletableFuture[util.List[AclCreationResult]]()
-        val createResults = auth.createAcls(request.context, 
validBindings.asJava).asScala.map(_.toCompletableFuture)
+        val createResults = auth.createAcls(request.context, 
validBindings.asJava).stream().map(_.toCompletableFuture).toList
 
         def sendResponseCallback(): Unit = {
           val aclCreationResults = allBindings.map { acl =>
-            val result = errorResults.getOrElse(acl, 
createResults(validBindings.indexOf(acl)).get)
+            val result = errorResults.getOrElse(acl, 
createResults.get(validBindings.indexOf(acl)).get)
             val creationResult = new AclCreationResult()
             result.exception.toScala.foreach { throwable =>
               val apiError = ApiError.fromThrowable(throwable)
@@ -123,7 +125,7 @@ class AclApis(authHelper: AuthHelper,
           }
           future.complete(aclCreationResults.asJava)
         }
-        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, 
createResults, sendResponseCallback)
+        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, 
createResults, () => sendResponseCallback())
 
         future.thenApply[Unit] { aclCreationResults =>
           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
@@ -147,14 +149,15 @@ class AclApis(authHelper: AuthHelper,
 
         val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
         val deleteResults = auth.deleteAcls(request.context, 
deleteAclsRequest.filters)
-          .asScala.map(_.toCompletableFuture).toList
+          .stream().map(_.toCompletableFuture).toList
 
         def sendResponseCallback(): Unit = {
-          val filterResults = 
deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
+          val filterResults: util.List[DeleteAclsFilterResult] = 
deleteResults.stream().map(_.get)
+            .map(DeleteAclsResponse.filterResult).collect(Collectors.toList())
           future.complete(filterResults)
         }
 
-        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, 
deleteResults, sendResponseCallback)
+        alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs, 
deleteResults, () => sendResponseCallback())
         future.thenApply[Unit] { filterResults =>
           requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
             new DeleteAclsResponse(
diff --git a/core/src/main/scala/kafka/server/DelayedFuture.scala 
b/core/src/main/scala/kafka/server/DelayedFuture.scala
deleted file mode 100644
index a24bc387089..00000000000
--- a/core/src/main/scala/kafka/server/DelayedFuture.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import kafka.utils.Logging
-
-import java.util
-import java.util.concurrent._
-import java.util.function.BiConsumer
-import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.utils.KafkaThread
-import org.apache.kafka.server.purgatory.{DelayedOperation, 
DelayedOperationKey, DelayedOperationPurgatory}
-
-import scala.collection.Seq
-
-/**
-  * A delayed operation using CompletionFutures that can be created by 
KafkaApis and watched
-  * in a DelayedFuturePurgatory purgatory. This is used for ACL updates using 
async Authorizers.
-  */
-class DelayedFuture[T](timeoutMs: Long,
-                       futures: Seq[CompletableFuture[T]],
-                       responseCallback: () => Unit)
-  extends DelayedOperation(timeoutMs) with Logging {
-
-  /**
-   * The operation can be completed if all the futures have completed 
successfully
-   * or failed with exceptions.
-   */
-  override def tryComplete() : Boolean = {
-    trace(s"Trying to complete operation for ${futures.size} futures")
-
-    val pending = futures.count(future => !future.isDone)
-    if (pending == 0) {
-      trace("All futures have been completed or have errors, completing the 
delayed operation")
-      forceComplete()
-    } else {
-      trace(s"$pending future still pending, not completing the delayed 
operation")
-      false
-    }
-  }
-
-  /**
-   * Timeout any pending futures and invoke responseCallback. This is invoked 
when all
-   * futures have completed or the operation has timed out.
-   */
-  override def onComplete(): Unit = {
-    val pendingFutures = futures.filterNot(_.isDone)
-    trace(s"Completing operation for ${futures.size} futures, expired 
${pendingFutures.size}")
-    pendingFutures.foreach(_.completeExceptionally(new 
TimeoutException(s"Request has been timed out after $timeoutMs ms")))
-    responseCallback.apply()
-  }
-
-  /**
-   * This is invoked after onComplete(), so no actions required.
-   */
-  override def onExpiration(): Unit = {
-  }
-}
-
-class DelayedFuturePurgatory(purgatoryName: String, brokerId: Int) {
-  private val purgatory = new 
DelayedOperationPurgatory[DelayedFuture[_]](purgatoryName, brokerId)
-  private val executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
-    new LinkedBlockingQueue[Runnable](),
-    new ThreadFactory {
-      override def newThread(r: Runnable): Thread = new 
KafkaThread(s"DelayedExecutor-$purgatoryName", r, true)
-    })
-  private val purgatoryKey = new DelayedOperationKey() {
-    override def keyLabel(): String = "delayed-future-key"
-  }
-
-  def tryCompleteElseWatch[T](timeoutMs: Long,
-                              futures: Seq[CompletableFuture[T]],
-                              responseCallback: () => Unit): DelayedFuture[T] 
= {
-    val delayedFuture = new DelayedFuture[T](timeoutMs, futures, 
responseCallback)
-    val done = purgatory.tryCompleteElseWatch(delayedFuture, 
util.Collections.singletonList(purgatoryKey))
-    if (!done) {
-      val callbackAction = new BiConsumer[Void, Throwable]() {
-        override def accept(result: Void, exception: Throwable): Unit = 
delayedFuture.forceComplete()
-      }
-      CompletableFuture.allOf(futures.toArray: 
_*).whenCompleteAsync(callbackAction, executor)
-    }
-    delayedFuture
-  }
-
-  def shutdown(): Unit = {
-    executor.shutdownNow()
-    executor.awaitTermination(60, TimeUnit.SECONDS)
-    purgatory.shutdown()
-  }
-
-  def isShutdown: Boolean = executor.isShutdown
-}
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala
deleted file mode 100644
index e9313159493..00000000000
--- a/core/src/test/scala/integration/kafka/server/DelayedFutureTest.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package integration.kafka.server
-
-import kafka.server.DelayedFuturePurgatory
-import kafka.utils.TestUtils
-import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.concurrent.{CompletableFuture, ExecutionException}
-import java.util.concurrent.atomic.AtomicInteger
-import scala.jdk.CollectionConverters.CollectionHasAsScala
-
-class DelayedFutureTest {
-
-  @Test
-  def testDelayedFuture(): Unit = {
-    val purgatoryName = "testDelayedFuture"
-    val purgatory = new DelayedFuturePurgatory(purgatoryName, brokerId = 0)
-    try {
-      val result = new AtomicInteger()
-
-      def hasExecutorThread: Boolean = 
Thread.getAllStackTraces.keySet.asScala.map(_.getName)
-        .exists(_.contains(s"DelayedExecutor-$purgatoryName"))
-
-      def updateResult(futures: List[CompletableFuture[Integer]]): Unit =
-        
result.set(futures.filterNot(_.isCompletedExceptionally).map(_.get.intValue).sum)
-
-      assertFalse(hasExecutorThread, "Unnecessary thread created")
-
-      // Two completed futures: callback should be executed immediately on the 
same thread
-      val futures1 = 
List(CompletableFuture.completedFuture(10.asInstanceOf[Integer]),
-        CompletableFuture.completedFuture(11.asInstanceOf[Integer]))
-      val r1 = purgatory.tryCompleteElseWatch[Integer](100000L, futures1, () 
=> updateResult(futures1))
-      assertTrue(r1.isCompleted, "r1 not completed")
-      assertEquals(21, result.get())
-      assertFalse(hasExecutorThread, "Unnecessary thread created")
-
-      // Two delayed futures: callback should wait for both to complete
-      result.set(-1)
-      val futures2 = List(new CompletableFuture[Integer], new 
CompletableFuture[Integer])
-      val r2 = purgatory.tryCompleteElseWatch[Integer](100000L, futures2, () 
=> updateResult(futures2))
-      assertFalse(r2.isCompleted, "r2 should be incomplete")
-      futures2.head.complete(20)
-      assertFalse(r2.isCompleted)
-      assertEquals(-1, result.get())
-      futures2(1).complete(21)
-      TestUtils.waitUntilTrue(() => r2.isCompleted, "r2 not completed")
-      TestUtils.waitUntilTrue(() => result.get == 41, "callback not invoked")
-      assertTrue(hasExecutorThread, "Thread not created for executing delayed 
task")
-
-      // One immediate and one delayed future: callback should wait for 
delayed task to complete
-      result.set(-1)
-      val futures3 = List(new CompletableFuture[Integer], 
CompletableFuture.completedFuture(31.asInstanceOf[Integer]))
-      val r3 = purgatory.tryCompleteElseWatch[Integer](100000L, futures3, () 
=> updateResult(futures3))
-      assertFalse(r3.isCompleted, "r3 should be incomplete")
-      assertEquals(-1, result.get())
-      futures3.head.complete(30)
-      TestUtils.waitUntilTrue(() => r3.isCompleted, "r3 not completed")
-      TestUtils.waitUntilTrue(() => result.get == 61, "callback not invoked")
-
-      // One future doesn't complete within timeout. Should expire and invoke 
callback after timeout.
-      result.set(-1)
-      val start = Time.SYSTEM.hiResClockMs
-      val expirationMs = 2000L
-      val futures4 = List(new CompletableFuture[Integer], new 
CompletableFuture[Integer])
-      val r4 = purgatory.tryCompleteElseWatch[Integer](expirationMs, futures4, 
() => updateResult(futures4))
-      futures4.head.complete(40)
-      TestUtils.waitUntilTrue(() => futures4(1).isDone, "r4 futures not 
expired")
-      assertTrue(r4.isCompleted, "r4 not completed after timeout")
-      val elapsed = Time.SYSTEM.hiResClockMs - start
-      assertTrue(elapsed >= expirationMs, s"Time for expiration $elapsed 
should at least $expirationMs")
-      assertEquals(40, futures4.head.get)
-      assertEquals(classOf[org.apache.kafka.common.errors.TimeoutException],
-        assertThrows(classOf[ExecutionException], () => 
futures4(1).get).getCause.getClass)
-      assertEquals(40, result.get())
-    } finally {
-      purgatory.shutdown()
-    }
-  }
-}
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuture.java 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuture.java
new file mode 100644
index 00000000000..b9fde063f65
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuture.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A delayed operation using CompletionFutures that can be created by 
KafkaApis and watched
+ * in a DelayedFuturePurgatory purgatory. This is used for ACL updates using 
async Authorizers.
+ */
+public class DelayedFuture<T> extends DelayedOperation {
+
+    private final Logger log = new 
LogContext().logger(DelayedFuture.class.getName());
+    private final List<CompletableFuture<T>> futures;
+    private final Runnable responseCallback;
+    private final long timeoutMs;
+
+    public DelayedFuture(long timeoutMs, List<CompletableFuture<T>> futures, 
Runnable responseCallback) {
+        super(timeoutMs);
+        this.timeoutMs = timeoutMs;
+        this.futures = futures;
+        this.responseCallback = responseCallback;
+    }
+
+    /**
+     * The operation can be completed if all the futures have completed 
successfully
+     * or failed with exceptions.
+     */
+    @Override
+    public boolean tryComplete() {
+        log.trace("Trying to complete operation for {} futures", 
futures.size());
+
+        long pending = futures.stream().filter(future -> 
!future.isDone()).count();
+        if (pending == 0) {
+            log.trace("All futures have been completed or have errors, 
completing the delayed operation");
+            return forceComplete();
+        } else {
+            log.trace("{} future still pending, not completing the delayed 
operation", pending);
+            return false;
+        }
+    }
+
+    /**
+     * Timeout any pending futures and invoke responseCallback. This is 
invoked when all
+     * futures have completed or the operation has timed out.
+     */
+    @Override
+    public void onComplete() {
+        List<CompletableFuture<T>> pendingFutures = 
futures.stream().filter(future -> !future.isDone()).toList();
+        log.trace("Completing operation for {} futures, expired {}", 
futures.size(), pendingFutures.size());
+        pendingFutures.forEach(future -> future.completeExceptionally(new 
TimeoutException("Request has been timed out after " + timeoutMs + " ms")));
+        responseCallback.run();
+    }
+
+    /**
+     * This is invoked after onComplete(), so no actions required.
+     */
+    @Override
+    public void onExpiration() {
+        // This is invoked after onComplete(), so no actions required.
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuturePurgatory.java
 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuturePurgatory.java
new file mode 100644
index 00000000000..637fdb1aab6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedFuturePurgatory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.utils.KafkaThread;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+public class DelayedFuturePurgatory {
+    private final DelayedOperationPurgatory<DelayedFuture<?>> purgatory;
+    private final ThreadPoolExecutor executor;
+    private final DelayedOperationKey purgatoryKey;
+
+    public DelayedFuturePurgatory(String purgatoryName, int brokerId) {
+        this.purgatory = new DelayedOperationPurgatory<>(purgatoryName, 
brokerId);
+        this.executor = new ThreadPoolExecutor(
+            1,
+            1,
+            0,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            r -> new KafkaThread("DelayedExecutor-" + purgatoryName, r, true));
+        this.purgatoryKey = () -> "delayed-future-key";
+    }
+
+    public <T> DelayedFuture<T> tryCompleteElseWatch(
+        long timeoutMs,
+        List<CompletableFuture<T>> futures,
+        Runnable responseCallback
+    ) {
+        DelayedFuture<T> delayedFuture = new DelayedFuture<>(timeoutMs, 
futures, responseCallback);
+        boolean done = purgatory.tryCompleteElseWatch(delayedFuture, 
List.of(purgatoryKey));
+        if (!done) {
+            BiConsumer<Void, Throwable> callbackAction = (result, exception) 
-> delayedFuture.forceComplete();
+            CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[0])).whenCompleteAsync(callbackAction, executor);
+        }
+        return delayedFuture;
+    }
+
+    public void shutdown() throws Exception {
+        executor.shutdownNow();
+        executor.awaitTermination(60, TimeUnit.SECONDS);
+        purgatory.shutdown();
+    }
+
+    public boolean isShutdown() {
+        return executor.isShutdown();
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java 
b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
new file mode 100644
index 00000000000..80efa9930c4
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DelayedFutureTest {
+
+    @Test
+    void testDelayedFuture() throws Exception {
+        String purgatoryName = "testDelayedFuture";
+        DelayedFuturePurgatory purgatory = new 
DelayedFuturePurgatory(purgatoryName, 0);
+        try {
+            AtomicInteger result = new AtomicInteger();
+
+            Supplier<Boolean> hasExecutorThread = () -> 
Thread.getAllStackTraces().keySet().stream()
+                .map(Thread::getName)
+                .anyMatch(name -> name.contains("DelayedExecutor-" + 
purgatoryName));
+
+            Function<List<CompletableFuture<Integer>>, Void> updateResult = 
futures -> {
+                result.set(futures.stream()
+                    
.filter(Predicate.not(CompletableFuture::isCompletedExceptionally))
+                    .mapToInt(future -> assertDoesNotThrow(() -> future.get()))
+                    .sum());
+                return null;
+            };
+
+            assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
+
+            // Two completed futures: callback should be executed immediately 
on the same thread
+            List<CompletableFuture<Integer>> futures1 = List.of(
+                CompletableFuture.completedFuture(10),
+                CompletableFuture.completedFuture(11)
+            );
+            DelayedFuture<Integer> r1 = 
purgatory.tryCompleteElseWatch(100000L, futures1, () -> 
updateResult.apply(futures1));
+            assertTrue(r1.isCompleted(), "r1 not completed");
+            assertEquals(21, result.get());
+            assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
+
+            // Two delayed futures: callback should wait for both to complete
+            result.set(-1);
+            List<CompletableFuture<Integer>> futures2 = List.of(new 
CompletableFuture<>(), new CompletableFuture<>());
+            DelayedFuture<Integer> r2 = 
purgatory.tryCompleteElseWatch(100000L, futures2, () -> 
updateResult.apply(futures2));
+            assertFalse(r2.isCompleted(), "r2 should be incomplete");
+            futures2.get(0).complete(20);
+            assertFalse(r2.isCompleted());
+            assertEquals(-1, result.get());
+            futures2.get(1).complete(21);
+            TestUtils.waitForCondition(r2::isCompleted, "r2 not completed");
+            TestUtils.waitForCondition(() -> result.get() == 41, "callback not 
invoked");
+            assertTrue(hasExecutorThread.get(), "Thread not created for 
executing delayed task");
+
+            // One immediate and one delayed future: callback should wait for 
delayed task to complete
+            result.set(-1);
+            List<CompletableFuture<Integer>> futures3 = List.of(
+                new CompletableFuture<>(),
+                CompletableFuture.completedFuture(31)
+            );
+            DelayedFuture<Integer> r3 = 
purgatory.tryCompleteElseWatch(100000L, futures3, () -> 
updateResult.apply(futures3));
+            assertFalse(r3.isCompleted(), "r3 should be incomplete");
+            assertEquals(-1, result.get());
+            futures3.get(0).complete(30);
+            TestUtils.waitForCondition(r3::isCompleted, "r3 not completed");
+            TestUtils.waitForCondition(() -> result.get() == 61, "callback not 
invoked");
+
+            // One future doesn't complete within timeout. Should expire and 
invoke callback after timeout.
+            result.set(-1);
+            long start = Time.SYSTEM.hiResClockMs();
+            long expirationMs = 2000L;
+            List<CompletableFuture<Integer>> futures4 = List.of(new 
CompletableFuture<>(), new CompletableFuture<>());
+            DelayedFuture<Integer> r4 = 
purgatory.tryCompleteElseWatch(expirationMs, futures4, () -> 
updateResult.apply(futures4));
+            futures4.get(0).complete(40);
+            TestUtils.waitForCondition(() -> futures4.get(1).isDone(), "r4 
futures not expired");
+            assertTrue(r4.isCompleted(), "r4 not completed after timeout");
+            long elapsed = Time.SYSTEM.hiResClockMs() - start;
+            assertTrue(elapsed >= expirationMs, "Time for expiration " + 
elapsed + " should at least " + expirationMs);
+            assertEquals(40, futures4.get(0).get());
+            Exception exception = assertThrows(ExecutionException.class, () -> 
futures4.get(1).get());
+            assertEquals(TimeoutException.class, 
exception.getCause().getClass());
+            assertEquals(40, result.get());
+        } finally {
+            purgatory.shutdown();
+        }
+    }
+}

Reply via email to