This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 21819c67c8f [fix][fn] complete flushAsync before closeAsync in
ProducerCache and wait for completion in closing the cache (#25140)
21819c67c8f is described below
commit 21819c67c8fa322ef82371d753490d5b4c5026c5
Author: Ilya Brin <[email protected]>
AuthorDate: Wed Jan 14 07:57:14 2026 -0600
[fix][fn] complete flushAsync before closeAsync in ProducerCache and wait
for completion in closing the cache (#25140)
Co-authored-by: Lari Hotari <[email protected]>
---
.../pulsar/functions/instance/ProducerCache.java | 31 +++++++++++++++----
.../functions/instance/ProducerCacheTest.java | 36 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 6 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
index df3bc9f8c9e..9a591cce3a8 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
@@ -22,12 +22,16 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@@ -60,16 +64,20 @@ public class ProducerCache implements Closeable {
private final Cache<ProducerCacheKey, Producer<?>> cache;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures =
new CopyOnWriteArrayList<>();
+ @VisibleForTesting
+ final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new
CopyOnWriteArrayList<>();
+ private final ExecutorService cacheExecutor;
public ProducerCache() {
+ cacheExecutor = Executors.newSingleThreadExecutor(new
DefaultThreadFactory("ProducerCache"));
Caffeine<ProducerCacheKey, Producer<?>> builder = Caffeine.newBuilder()
.recordStats()
.scheduler(Scheduler.systemScheduler())
+ .executor(cacheExecutor)
.<ProducerCacheKey, Producer<?>>removalListener((key,
producer, cause) -> {
log.info("Closing producer for topic {}, cause {}",
key.topic(), cause);
CompletableFuture closeFuture =
- CompletableFuture.supplyAsync(() ->
producer.flushAsync(), Runnable::run)
+ producer.flushAsync()
.orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS,
TimeUnit.SECONDS)
.exceptionally(ex -> {
Throwable unwrappedCause =
FutureUtil.unwrapCompletionException(ex);
@@ -131,10 +139,21 @@ public class ProducerCache implements Closeable {
public void close() {
if (closed.compareAndSet(false, true)) {
cache.invalidateAll();
- try {
- FutureUtil.waitForAll(closeFutures).get();
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Failed to close producers", e);
+ // schedule the waiting job on the cache executor
+ cacheExecutor.execute(() -> {
+ try {
+ FutureUtil.waitForAll(closeFutures).get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to close producers", e);
+ }
+ });
+ // Wait for the cache executor to terminate.
+ // The eviction jobs and waiting for the close futures to complete
will run on the single-threaded
+ // cache executor, so we need to wait for them to finish to ensure
that the cache is closed properly.
+ boolean terminated =
MoreExecutors.shutdownAndAwaitTermination(cacheExecutor,
+ Duration.ofSeconds(FLUSH_OR_CLOSE_TIMEOUT_SECONDS));
+ if (!terminated) {
+ log.warn("Failed to shutdown cache executor gracefully.");
}
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
index af95a7901b6..129651c8804 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.functions.instance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.annotations.Test;
@@ -61,4 +64,37 @@ public class ProducerCacheTest {
cache.close();
}
+ @Test
+ public void shouldCompleteFlushBeforeCloseAndWaitForClosing() {
+ ProducerCache cache = new ProducerCache();
+ Producer producer = mock(Producer.class);
+ AtomicBoolean flushCompleted = new AtomicBoolean(false);
+
when(producer.flushAsync()).thenReturn(CompletableFuture.supplyAsync(() -> {
+ try {
+ // add delay to make sure that cache close waits for
completion of flushAsync
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ flushCompleted.set(true);
+ return null;
+ }));
+ AtomicBoolean closeCompleted = new AtomicBoolean(false);
+
when(producer.closeAsync()).thenReturn(CompletableFuture.supplyAsync(() -> {
+ try {
+ // add delay to make sure that cache close waits for
completion of closeAsync
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ closeCompleted.set(true);
+ return null;
+ }));
+ cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE,
"topic", "key",
+ () -> (Producer<Object>) producer);
+ cache.close();
+ assertTrue(flushCompleted.get());
+ assertTrue(closeCompleted.get());
+ assertEquals(cache.closeFutures.size(), 1);
+ }
}
\ No newline at end of file