This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 00def717814fd4bee970292dfbf7a0242d10c553 Author: Ilya Brin <[email protected]> AuthorDate: Wed Jan 14 07:57:14 2026 -0600 [fix][fn] complete flushAsync before closeAsync in ProducerCache and wait for completion in closing the cache (#25140) Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 21819c67c8fa322ef82371d753490d5b4c5026c5) --- .../pulsar/functions/instance/ProducerCache.java | 31 +++++++++++++++---- .../functions/instance/ProducerCacheTest.java | 36 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java index df3bc9f8c9e..9a591cce3a8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java @@ -22,12 +22,16 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.Closeable; import java.time.Duration; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -60,16 +64,20 @@ public class ProducerCache implements Closeable { private final Cache<ProducerCacheKey, Producer<?>> cache; private final AtomicBoolean closed = new AtomicBoolean(false); - private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>(); + @VisibleForTesting + final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>(); + private final ExecutorService cacheExecutor; public ProducerCache() { + cacheExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("ProducerCache")); Caffeine<ProducerCacheKey, Producer<?>> builder = Caffeine.newBuilder() .recordStats() .scheduler(Scheduler.systemScheduler()) + .executor(cacheExecutor) .<ProducerCacheKey, Producer<?>>removalListener((key, producer, cause) -> { log.info("Closing producer for topic {}, cause {}", key.topic(), cause); CompletableFuture closeFuture = - CompletableFuture.supplyAsync(() -> producer.flushAsync(), Runnable::run) + producer.flushAsync() .orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS) .exceptionally(ex -> { Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex); @@ -131,10 +139,21 @@ public class ProducerCache implements Closeable { public void close() { if (closed.compareAndSet(false, true)) { cache.invalidateAll(); - try { - FutureUtil.waitForAll(closeFutures).get(); - } catch (InterruptedException | ExecutionException e) { - log.warn("Failed to close producers", e); + // schedule the waiting job on the cache executor + cacheExecutor.execute(() -> { + try { + FutureUtil.waitForAll(closeFutures).get(); + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to close producers", e); + } + }); + // Wait for the cache executor to terminate. + // The eviction jobs and waiting for the close futures to complete will run on the single-threaded + // cache executor, so we need to wait for them to finish to ensure that the cache is closed properly. + boolean terminated = MoreExecutors.shutdownAndAwaitTermination(cacheExecutor, + Duration.ofSeconds(FLUSH_OR_CLOSE_TIMEOUT_SECONDS)); + if (!terminated) { + log.warn("Failed to shutdown cache executor gracefully."); } } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java index af95a7901b6..129651c8804 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java @@ -20,7 +20,10 @@ package org.apache.pulsar.functions.instance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.testng.annotations.Test; @@ -61,4 +64,37 @@ public class ProducerCacheTest { cache.close(); } + @Test + public void shouldCompleteFlushBeforeCloseAndWaitForClosing() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + AtomicBoolean flushCompleted = new AtomicBoolean(false); + when(producer.flushAsync()).thenReturn(CompletableFuture.supplyAsync(() -> { + try { + // add delay to make sure that cache close waits for completion of flushAsync + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + flushCompleted.set(true); + return null; + })); + AtomicBoolean closeCompleted = new AtomicBoolean(false); + when(producer.closeAsync()).thenReturn(CompletableFuture.supplyAsync(() -> { + try { + // add delay to make sure that cache close waits for completion of closeAsync + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + closeCompleted.set(true); + return null; + })); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer<Object>) producer); + cache.close(); + assertTrue(flushCompleted.get()); + assertTrue(closeCompleted.get()); + assertEquals(cache.closeFutures.size(), 1); + } } \ No newline at end of file
