This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b04995d4239e231e0f8bc91a3ebf4262906c957b
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Tue Jun 25 08:37:43 2024 +0300

    [improve][fn] Make producer cache bounded and expiring in 
Functions/Connectors (#22945)
    
    (cherry picked from commit 6fe8100b1fd5d37a6e1bf33803a8904fa3879321)
---
 pulsar-functions/instance/pom.xml                  |   5 +
 .../pulsar/functions/instance/ContextImpl.java     |  84 +++----------
 .../functions/instance/JavaInstanceRunnable.java   |   8 +-
 .../pulsar/functions/instance/ProducerCache.java   | 130 ++++++++++++++++++++
 .../apache/pulsar/functions/sink/PulsarSink.java   |  89 +++++---------
 .../pulsar/functions/instance/ContextImplTest.java |  24 ++--
 .../pulsar/functions/sink/PulsarSinkTest.java      | 132 +++++++++------------
 7 files changed, 267 insertions(+), 205 deletions(-)

diff --git a/pulsar-functions/instance/pom.xml 
b/pulsar-functions/instance/pom.xml
index 3ead464a99d..ddc59937b68 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -157,6 +157,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>info.picocli</groupId>
       <artifactId>picocli</artifactId>
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index eeeaa8b3627..f613f749bd0 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -29,16 +29,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -85,6 +84,7 @@ import org.slf4j.Logger;
 /**
  * This class implements the Context interface exposed to the user.
  */
+@Slf4j
 @ToString(exclude = {"pulsarAdmin"})
 class ContextImpl implements Context, SinkContext, SourceContext, 
AutoCloseable {
     private final ProducerBuilderFactory producerBuilderFactory;
@@ -98,8 +98,6 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
     private final ClientBuilder clientBuilder;
     private final PulsarClient client;
     private final PulsarAdmin pulsarAdmin;
-    private Map<String, Producer<?>> publishProducers;
-    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
 
     private final TopicSchema topicSchema;
 
@@ -139,12 +137,15 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
 
     private final java.util.function.Consumer<Throwable> fatalHandler;
 
+    private final ProducerCache producerCache;
+    private final boolean useThreadLocalProducers;
+
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
                        SecretsProvider secretsProvider, 
FunctionCollectorRegistry collectorRegistry,
                        String[] metricsLabels,
                        Function.FunctionDetails.ComponentType componentType, 
ComponentStatsManager statsManager,
                        StateManager stateManager, PulsarAdmin pulsarAdmin, 
ClientBuilder clientBuilder,
-                       java.util.function.Consumer<Throwable> fatalHandler) {
+                       java.util.function.Consumer<Throwable> fatalHandler, 
ProducerCache producerCache) {
         this.config = config;
         this.logger = logger;
         this.clientBuilder = clientBuilder;
@@ -154,14 +155,17 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
         this.statsManager = statsManager;
         this.fatalHandler = fatalHandler;
 
-        boolean useThreadLocalProducers = false;
+        this.producerCache = producerCache;
 
         Function.ProducerSpec producerSpec = 
config.getFunctionDetails().getSink().getProducerSpec();
         ProducerConfig producerConfig = null;
         if (producerSpec != null) {
             producerConfig = 
FunctionConfigUtils.convertProducerSpecToProducerConfig(producerSpec);
             useThreadLocalProducers = 
producerSpec.getUseThreadLocalProducers();
+        } else {
+            useThreadLocalProducers = false;
         }
+
         producerBuilderFactory = new ProducerBuilderFactory(client, 
producerConfig,
                 Thread.currentThread().getContextClassLoader(),
                 // This is for backwards compatibility. The PR 
https://github.com/apache/pulsar/pull/19470 removed
@@ -175,12 +179,6 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
                         this.config.getFunctionDetails().getName()),
                 this.config.getInstanceId()));
 
-        if (useThreadLocalProducers) {
-            tlPublishProducers = new ThreadLocal<>();
-        } else {
-            publishProducers = new ConcurrentHashMap<>();
-        }
-
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
             userConfigs = new HashMap<>();
         } else {
@@ -543,39 +541,15 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
     }
 
     private <T> Producer<T> getProducer(String topicName, Schema<T> schema) 
throws PulsarClientException {
-        Producer<T> producer;
-        if (tlPublishProducers != null) {
-            Map<String, Producer<?>> producerMap = tlPublishProducers.get();
-            if (producerMap == null) {
-                producerMap = new HashMap<>();
-                tlPublishProducers.set(producerMap);
-            }
-            producer = (Producer<T>) producerMap.get(topicName);
-        } else {
-            producer = (Producer<T>) publishProducers.get(topicName);
-        }
-
-        if (producer == null) {
-            Producer<T> newProducer = producerBuilderFactory
-                    .createProducerBuilder(topicName, schema, null)
-                    .properties(producerProperties)
-                    .create();
-
-            if (tlPublishProducers != null) {
-                tlPublishProducers.get().put(topicName, newProducer);
-            } else {
-                Producer<T> existingProducer = (Producer<T>) 
publishProducers.putIfAbsent(topicName, newProducer);
-
-                if (existingProducer != null) {
-                    // The value in the map was not updated after the 
concurrent put
-                    newProducer.close();
-                    producer = existingProducer;
-                } else {
-                    producer = newProducer;
-                }
-            }
-        }
-        return producer;
+        Long additionalCacheKey = useThreadLocalProducers ? 
Thread.currentThread().getId() : null;
+        return 
producerCache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE,
+                topicName, additionalCacheKey, () -> {
+                    log.info("Initializing producer on topic {} with schema 
{}", topicName, schema);
+                    return producerBuilderFactory
+                            .createProducerBuilder(topicName, schema, null)
+                            .properties(producerProperties)
+                            .create();
+                });
     }
 
     public Map<String, Double> getAndResetMetrics() {
@@ -714,29 +688,9 @@ class ContextImpl implements Context, SinkContext, 
SourceContext, AutoCloseable
 
     @Override
     public void close() {
-        List<CompletableFuture> futures = new LinkedList<>();
-
-        if (publishProducers != null) {
-            for (Producer<?> producer : publishProducers.values()) {
-                futures.add(producer.closeAsync());
-            }
-        }
-
-        if (tlPublishProducers != null) {
-            for (Producer<?> producer : tlPublishProducers.get().values()) {
-                futures.add(producer.closeAsync());
-            }
-        }
-
         if (pulsarAdmin != null) {
             pulsarAdmin.close();
         }
-
-        try {
-            CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get();
-        } catch (InterruptedException | ExecutionException e) {
-            logger.warn("Failed to close producers", e);
-        }
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index f1b9af00f9d..baf0c5f7400 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -168,6 +168,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private final AtomicReference<Schema<?>> sinkSchema = new 
AtomicReference<>();
     private SinkSchemaInfoProvider sinkSchemaInfoProvider = null;
 
+    private final ProducerCache producerCache = new ProducerCache();
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 ClientBuilder clientBuilder,
                                 PulsarClient pulsarClient,
@@ -292,7 +294,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
             return new ContextImpl(instanceConfig, instanceLog, client, 
secretsProvider,
                 collectorRegistry, metricsLabels, this.componentType, 
this.stats, stateManager,
-                pulsarAdmin, clientBuilder, fatalHandler);
+                pulsarAdmin, clientBuilder, fatalHandler, producerCache);
         } finally {
             Thread.currentThread().setContextClassLoader(clsLoader);
         }
@@ -607,6 +609,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
         instanceCache = null;
 
+        producerCache.close();
+
         if (logAppender != null) {
             removeLogTopicAppender(LoggerContext.getContext());
             removeLogTopicAppender(LoggerContext.getContext(false));
@@ -1050,7 +1054,7 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 }
 
                 object = new PulsarSink(this.client, pulsarSinkConfig, 
this.properties, this.stats,
-                        this.functionClassLoader);
+                        this.functionClassLoader, this.producerCache);
             }
         } else {
             object = Reflections.createInstance(
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
new file mode 100644
index 00000000000..f68c4e95895
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class ProducerCache implements Closeable {
+    // allow tuning the cache timeout with PRODUCER_CACHE_TIMEOUT_SECONDS env 
variable
+    private static final int PRODUCER_CACHE_TIMEOUT_SECONDS =
+            
Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_TIMEOUT_SECONDS", 
"300"));
+    // allow tuning the cache size with PRODUCER_CACHE_MAX_SIZE env variable
+    private static final int PRODUCER_CACHE_MAX_SIZE =
+            
Integer.parseInt(System.getenv().getOrDefault("PRODUCER_CACHE_MAX_SIZE", 
"10000"));
+    private static final int FLUSH_OR_CLOSE_TIMEOUT_SECONDS = 60;
+
+    // prevents the different producers created in different code locations 
from mixing up
+    public enum CacheArea {
+        // producers created by calling Context, SinkContext, SourceContext 
methods
+        CONTEXT_CACHE,
+        // producers created in Pulsar Sources, multiple topics are possible 
by returning destination topics
+        // by SinkRecord.getDestinationTopic call
+        SINK_RECORD_CACHE,
+    }
+
+    record ProducerCacheKey(CacheArea cacheArea, String topic, Object 
additionalKey) {
+    }
+
+    private final Cache<ProducerCacheKey, Producer<?>> cache;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = 
new CopyOnWriteArrayList<>();
+
+    public ProducerCache() {
+        Caffeine<ProducerCacheKey, Producer> builder = Caffeine.newBuilder()
+                .scheduler(Scheduler.systemScheduler())
+                .<ProducerCacheKey, Producer>removalListener((key, producer, 
cause) -> {
+                    log.info("Closing producer for topic {}, cause {}", 
key.topic(), cause);
+                    CompletableFuture closeFuture =
+                            producer.flushAsync()
+                                    .orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
+                                    .exceptionally(ex -> {
+                                        log.error("Error flushing producer for 
topic {}", key.topic(), ex);
+                                        return null;
+                                    }).thenCompose(__ ->
+                                            
producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS,
+                                                            TimeUnit.SECONDS)
+                                                    .exceptionally(ex -> {
+                                                        log.error("Error 
closing producer for topic {}", key.topic(),
+                                                                ex);
+                                                        return null;
+                                                    }));
+                    if (closed.get()) {
+                        closeFutures.add(closeFuture);
+                    }
+                })
+                .weigher((key, producer) -> 
Math.max(producer.getNumOfPartitions(), 1))
+                .maximumWeight(PRODUCER_CACHE_MAX_SIZE);
+        if (PRODUCER_CACHE_TIMEOUT_SECONDS > 0) {
+            
builder.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS));
+        }
+        cache = builder.build();
+    }
+
+    public <T> Producer<T> getOrCreateProducer(CacheArea cacheArea, String 
topicName, Object additionalCacheKey,
+                                               Callable<Producer<T>> supplier) 
{
+        if (closed.get()) {
+            throw new IllegalStateException("ProducerCache is already closed");
+        }
+        return (Producer<T>) cache.get(new ProducerCacheKey(cacheArea, 
topicName, additionalCacheKey), key -> {
+            try {
+                return supplier.call();
+            } catch (RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new RuntimeException("Unable to create producer for 
topic '" + topicName + "'", e);
+            }
+        });
+    }
+
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            cache.invalidateAll();
+            try {
+                FutureUtil.waitForAll(closeFutures).get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.warn("Failed to close producers", e);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public boolean containsKey(CacheArea cacheArea, String topic) {
+        return containsKey(cacheArea, topic, null);
+    }
+
+    @VisibleForTesting
+    public boolean containsKey(CacheArea cacheArea, String topic, Object 
additionalCacheKey) {
+        return cache.getIfPresent(new ProducerCacheKey(cacheArea, topic, 
additionalCacheKey)) != null;
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 18e55e8e84d..da6b8006eb9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -20,19 +20,15 @@ package org.apache.pulsar.functions.sink;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Base64;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -48,6 +44,7 @@ import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.AbstractSinkRecord;
 import org.apache.pulsar.functions.instance.ProducerBuilderFactory;
+import org.apache.pulsar.functions.instance.ProducerCache;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
@@ -62,6 +59,7 @@ public class PulsarSink<T> implements Sink<T> {
     private final Map<String, String> properties;
     private final ClassLoader functionClassLoader;
     private ComponentStatsManager stats;
+    private final ProducerCache producerCache;
 
     @VisibleForTesting
     PulsarSinkProcessor<T> pulsarSinkProcessor;
@@ -80,43 +78,25 @@ public class PulsarSink<T> implements Sink<T> {
     }
 
     abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
-        protected Map<String, Producer<T>> publishProducers = new 
ConcurrentHashMap<>();
-
         protected Producer<T> getProducer(String destinationTopic, Schema 
schema) {
-            return getProducer(destinationTopic, null, destinationTopic, 
schema);
+            return getProducer(destinationTopic, schema, null, null);
         }
 
-        protected Producer<T> getProducer(String producerId, String 
producerName, String topicName, Schema schema) {
-            return publishProducers.computeIfAbsent(producerId, s -> {
-                try {
-                    log.info("Initializing producer {} on topic {} with schema 
{}",
-                        producerName, topicName, schema);
-                    Producer<T> producer = createProducer(
-                            topicName,
-                            schema, producerName
-                    );
-                    log.info("Initialized producer {} on topic {} with schema 
{}: {} -> {}",
-                        producerName, topicName, schema, producerId, producer);
-                    return producer;
-                } catch (PulsarClientException e) {
-                    log.error("Failed to create Producer while doing user 
publish", e);
-                    throw new RuntimeException(e);
-                }
-            });
+        protected Producer<T> getProducer(String topicName, Schema schema, 
String producerName, String partitionId) {
+            return 
producerCache.getOrCreateProducer(ProducerCache.CacheArea.SINK_RECORD_CACHE, 
topicName, partitionId,
+                    () -> {
+                        Producer<T> producer = createProducer(topicName, 
schema, producerName);
+                        log.info(
+                                "Initialized producer with name '{}' on topic 
'{}' with schema {} partitionId {} "
+                                        + "-> {}",
+                                producerName, topicName, schema, partitionId, 
producer);
+                        return producer;
+                    });
         }
 
         @Override
         public void close() throws Exception {
-            List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(publishProducers.size());
-            for (Map.Entry<String, Producer<T>> entry : 
publishProducers.entrySet()) {
-                Producer<T> producer = entry.getValue();
-                closeFutures.add(producer.closeAsync());
-            }
-            try {
-                
org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures);
-            } catch (Exception e) {
-                log.warn("Failed to close all the producers", e);
-            }
+            // no op
         }
 
         public Function<Throwable, Void> 
getPublishErrorHandler(AbstractSinkRecord<T> record, boolean failSource) {
@@ -153,13 +133,7 @@ public class PulsarSink<T> implements Sink<T> {
         public PulsarSinkAtMostOnceProcessor() {
             if (!(schema instanceof AutoConsumeSchema)) {
                 // initialize default topic
-                try {
-                    publishProducers.put(pulsarSinkConfig.getTopic(),
-                        createProducer(pulsarSinkConfig.getTopic(), schema, 
null));
-                } catch (PulsarClientException e) {
-                    log.error("Failed to create Producer while doing user 
publish", e);
-                    throw new RuntimeException(e);
-                }
+                getProducer(pulsarSinkConfig.getTopic(), schema);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("The Pulsar producer is not initialized until 
the first record is"
@@ -232,13 +206,10 @@ public class PulsarSink<T> implements Sink<T> {
                 // we must use the destination topic schema
                 schemaToWrite = schema;
             }
-            Producer<T> producer = getProducer(
-                    String.format("%s-%s", 
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
-                            record.getPartitionId().get()),
-                    record.getPartitionId().get(),
-                    
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
-                    schemaToWrite
-            );
+            String topicName = 
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
+            String partitionId = record.getPartitionId().get();
+            String producerName = partitionId;
+            Producer<T> producer = getProducer(topicName, schemaToWrite, 
producerName, partitionId);
             if (schemaToWrite != null) {
                 return producer.newMessage(schemaToWrite);
             } else {
@@ -263,13 +234,14 @@ public class PulsarSink<T> implements Sink<T> {
     }
 
     public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, 
Map<String, String> properties,
-                      ComponentStatsManager stats, ClassLoader 
functionClassLoader) {
+                      ComponentStatsManager stats, ClassLoader 
functionClassLoader, ProducerCache producerCache) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
         this.topicSchema = new TopicSchema(client, functionClassLoader);
         this.properties = properties;
         this.stats = stats;
         this.functionClassLoader = functionClassLoader;
+        this.producerCache = producerCache;
     }
 
     @Override
@@ -341,14 +313,17 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    Producer<T> createProducer(String topic, Schema<T> schema, String 
producerName)
-            throws PulsarClientException {
-        ProducerBuilder<T> builder =
-                producerBuilderFactory.createProducerBuilder(topic, schema != 
null ? schema : this.schema,
-                        producerName);
-        return builder
-                .properties(properties)
-                .create();
+    Producer<T> createProducer(String topicName, Schema<T> schema, String 
producerName) {
+        Schema<T> schemaToUse = schema != null ? schema : this.schema;
+        try {
+            log.info("Initializing producer {} on topic {} with schema {}", 
producerName, topicName, schemaToUse);
+            return producerBuilderFactory.createProducerBuilder(topicName, 
schemaToUse, producerName)
+                    .properties(properties)
+                    .create();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException("Failed to create Producer for topic " 
+ topicName
+                    + " producerName " + producerName + " schema " + 
schemaToUse, e);
+        }
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 115ef1e8a3f..cb4c93f153f 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -72,6 +72,7 @@ import org.assertj.core.util.Lists;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -87,6 +88,7 @@ public class ContextImplTest {
     private PulsarAdmin pulsarAdmin;
     private ContextImpl context;
     private Producer producer;
+    private ProducerCache producerCache;
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws PulsarClientException {
@@ -117,16 +119,24 @@ public class ContextImplTest {
         TypedMessageBuilder messageBuilder = spy(new 
TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
         doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
         when(producer.newMessage()).thenReturn(messageBuilder);
+        
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
+        producerCache = new ProducerCache();
         context = new ContextImpl(
             config,
             logger,
             client,
             new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(), new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder, t -> {});
+                pulsarAdmin, clientBuilder, t -> {}, producerCache);
         context.setCurrentMessageContext((Record<String>) () -> null);
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        producerCache.close();
+        producerCache = null;
+    }
+
     @Test(expectedExceptions = IllegalStateException.class)
     public void testIncrCounterStateDisabled() {
         context.incrCounter("test-key", 10);
@@ -237,7 +247,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder, t -> {});
+                pulsarAdmin, clientBuilder, t -> {}, producerCache);
         context.getPulsarAdmin();
     }
 
@@ -251,7 +261,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder, t -> {});
+                pulsarAdmin, clientBuilder, t -> {}, producerCache);
         try {
             context.seek("z", 0, Mockito.mock(MessageId.class));
             Assert.fail("Expected exception");
@@ -282,7 +292,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder, t -> {});
+                pulsarAdmin, clientBuilder, t -> {}, producerCache);
         Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
         
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
         context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -314,7 +324,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder, t -> {});
+                pulsarAdmin, clientBuilder, t -> {}, producerCache);
         Consumer<?> mockConsumer = Mockito.mock(Consumer.class);
         
when(mockConsumer.getTopic()).thenReturn(TopicName.get("z").toString());
         context.setInputConsumers(Lists.newArrayList(mockConsumer));
@@ -338,7 +348,7 @@ public class ContextImplTest {
                 new EnvironmentBasedSecretsProvider(), 
FunctionCollectorRegistry.getDefaultImplementation(),
                 new String[0],
                 FunctionDetails.ComponentType.FUNCTION, null, new 
InstanceStateManager(),
-                pulsarAdmin, clientBuilder, t -> {});
+                pulsarAdmin, clientBuilder, t -> {}, producerCache);
         ConsumerImpl<?> consumer1 = Mockito.mock(ConsumerImpl.class);
         
when(consumer1.getTopic()).thenReturn(TopicName.get("first").toString());
         ConsumerImpl<?> consumer2 = Mockito.mock(ConsumerImpl.class);
@@ -456,7 +466,7 @@ public class ContextImplTest {
                 pulsarAdmin, clientBuilder, t -> {
                     assertEquals(t, fatalException);
                     fatalInvoked.set(true);
-        });
+        }, producerCache);
         context.fatal(fatalException);
         assertTrue(fatalInvoked.get());
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 799bad839a4..8a946a3f757 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -18,13 +18,13 @@
  */
 package org.apache.pulsar.functions.sink;
 
+import static 
org.apache.pulsar.functions.instance.ProducerCache.CacheArea.SINK_RECORD_CACHE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -37,7 +37,6 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.Getter;
@@ -65,12 +64,14 @@ import 
org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.ProducerCache;
 import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.io.core.SinkContext;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -132,6 +133,7 @@ public class PulsarSinkTest {
         doReturn(producer).when(producerBuilder).create();
         doReturn(typedMessageBuilder).when(producer).newMessage();
         
doReturn(typedMessageBuilder).when(producer).newMessage(any(Schema.class));
+        
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
 
         doReturn(producerBuilder).when(pulsarClient).newProducer();
         doReturn(producerBuilder).when(pulsarClient).newProducer(any());
@@ -139,9 +141,17 @@ public class PulsarSinkTest {
         return pulsarClient;
     }
 
-    @BeforeMethod
+    ProducerCache producerCache;
+
+    @BeforeMethod(alwaysRun = true)
     public void setup() {
+        producerCache = new ProducerCache();
+    }
 
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        producerCache.close();
+        producerCache = null;
     }
 
     private static PulsarSinkConfig getPulsarConfigs() {
@@ -182,7 +192,7 @@ public class PulsarSinkTest {
         pulsarConfig.setTypeClassName(Void.class.getName());
         PulsarSink pulsarSink =
                 new PulsarSink(getPulsarClient(), pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                        Thread.currentThread().getContextClassLoader());
+                        Thread.currentThread().getContextClassLoader(), 
producerCache);
 
         try {
             Schema schema = pulsarSink.initializeSchema();
@@ -202,7 +212,7 @@ public class PulsarSinkTest {
         pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
         PulsarSink pulsarSink =
                 new PulsarSink(getPulsarClient(), pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                        Thread.currentThread().getContextClassLoader());
+                        Thread.currentThread().getContextClassLoader(), 
producerCache);
         try {
             pulsarSink.initializeSchema();
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
@@ -227,7 +237,7 @@ public class PulsarSinkTest {
         pulsarConfig.setTypeClassName(String.class.getName());
         PulsarSink pulsarSink =
                 new PulsarSink(getPulsarClient(), pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                        Thread.currentThread().getContextClassLoader());
+                        Thread.currentThread().getContextClassLoader(), 
producerCache);
 
         try {
             pulsarSink.initializeSchema();
@@ -248,7 +258,7 @@ public class PulsarSinkTest {
         pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
         PulsarSink pulsarSink =
                 new PulsarSink(getPulsarClient(), pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                        Thread.currentThread().getContextClassLoader());
+                        Thread.currentThread().getContextClassLoader(), 
producerCache);
 
         try {
             pulsarSink.initializeSchema();
@@ -266,7 +276,7 @@ public class PulsarSinkTest {
         pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
         PulsarSink pulsarSink =
                 new PulsarSink(getPulsarClient(), pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                        Thread.currentThread().getContextClassLoader());
+                        Thread.currentThread().getContextClassLoader(), 
producerCache);
 
         try {
             pulsarSink.initializeSchema();
@@ -286,7 +296,7 @@ public class PulsarSinkTest {
         pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
         PulsarSink sink = new PulsarSink(
             pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
-            Thread.currentThread().getContextClassLoader());
+            Thread.currentThread().getContextClassLoader(), producerCache);
         Schema<?> schema = sink.initializeSchema();
         assertTrue(schema instanceof AutoConsumeSchema);
 
@@ -295,7 +305,7 @@ public class PulsarSinkTest {
         pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
         sink = new PulsarSink(
             pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
-            Thread.currentThread().getContextClassLoader());
+            Thread.currentThread().getContextClassLoader(), producerCache);
         schema = sink.initializeSchema();
         assertTrue(schema instanceof AutoConsumeSchema);
 
@@ -306,7 +316,7 @@ public class PulsarSinkTest {
         pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
         sink = new PulsarSink(
             pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
-            Thread.currentThread().getContextClassLoader());
+            Thread.currentThread().getContextClassLoader(), producerCache);
         schema = sink.initializeSchema();
         assertTrue(schema instanceof AutoConsumeSchema);
 
@@ -317,7 +327,7 @@ public class PulsarSinkTest {
         pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
         sink = new PulsarSink(
             pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
-            Thread.currentThread().getContextClassLoader());
+            Thread.currentThread().getContextClassLoader(), producerCache);
         schema = sink.initializeSchema();
         assertTrue(schema instanceof AutoConsumeSchema);
 
@@ -327,7 +337,7 @@ public class PulsarSinkTest {
         pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
         sink = new PulsarSink(
             pulsarClient, pulsarSinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
-            Thread.currentThread().getContextClassLoader());
+            Thread.currentThread().getContextClassLoader(), producerCache);
         schema = sink.initializeSchema();
         assertTrue(schema instanceof AutoConsumeSchema);
     }
@@ -344,9 +354,12 @@ public class PulsarSinkTest {
         /** test MANUAL **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.MANUAL);
-        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class), 
Thread.currentThread().getContextClassLoader());
+        PulsarSink pulsarSink =
+                new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
+                        Thread.currentThread().getContextClassLoader(), 
producerCache);
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+        verify(pulsarClient.newProducer(), times(1)).topic(defaultTopic);
 
         for (String topic : topics) {
 
@@ -370,23 +383,19 @@ public class PulsarSinkTest {
             PulsarSink.PulsarSinkManualProcessor pulsarSinkManualProcessor
                     = (PulsarSink.PulsarSinkManualProcessor) 
pulsarSink.pulsarSinkProcessor;
             if (topic != null) {
-                
Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(topic));
+                Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
topic));
             } else {
-                
Assert.assertTrue(pulsarSinkManualProcessor.publishProducers.containsKey(defaultTopic));
+                Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
defaultTopic));
             }
-            verify(pulsarClient.newProducer(), 
times(1)).topic(argThat(otherTopic -> {
-                if (topic != null) {
-                    return topic.equals(otherTopic);
-                } else {
-                    return defaultTopic.equals(otherTopic);
-                }
-            }));
+            String actualTopic = topic != null ? topic : defaultTopic;
+            verify(pulsarClient.newProducer(), times(1)).topic(actualTopic);
         }
 
         /** test At-least-once **/
         pulsarClient = getPulsarClient();
         
pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class), 
Thread.currentThread().getContextClassLoader());
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
+                Thread.currentThread().getContextClassLoader(), producerCache);
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -410,24 +419,17 @@ public class PulsarSinkTest {
             PulsarSink.PulsarSinkAtLeastOnceProcessor 
pulsarSinkAtLeastOnceProcessor
                     = (PulsarSink.PulsarSinkAtLeastOnceProcessor) 
pulsarSink.pulsarSinkProcessor;
             if (topic != null) {
-                
Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(topic));
+                Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
topic));
             } else {
-                
Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
+                Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
defaultTopic));
             }
-            verify(pulsarClient.newProducer(), 
times(1)).topic(argThat(otherTopic -> {
-                if (topic != null) {
-                    return topic.equals(otherTopic);
-                } else {
-                    return defaultTopic.equals(otherTopic);
-                }
-            }));
         }
 
         /** test At-most-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE);
         pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                Thread.currentThread().getContextClassLoader());
+                Thread.currentThread().getContextClassLoader(), producerCache);
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -457,20 +459,17 @@ public class PulsarSinkTest {
             PulsarSink.PulsarSinkAtMostOnceProcessor 
pulsarSinkAtLeastOnceProcessor
                     = (PulsarSink.PulsarSinkAtMostOnceProcessor) 
pulsarSink.pulsarSinkProcessor;
             if (topic != null) {
-                
Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(topic));
+                Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
topic));
             } else {
-                
Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
+                Assert.assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
defaultTopic));
             }
-            verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> {
-                return getTopicEquals(o, topic, defaultTopic);
-            }));
         }
 
         /** test Effectively-once **/
         pulsarClient = getPulsarClient();
         
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
         pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class),
-                Thread.currentThread().getContextClassLoader());
+                Thread.currentThread().getContextClassLoader(), producerCache);
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -520,23 +519,19 @@ public class PulsarSinkTest {
             PulsarSink.PulsarSinkEffectivelyOnceProcessor 
pulsarSinkEffectivelyOnceProcessor
                     = (PulsarSink.PulsarSinkEffectivelyOnceProcessor) 
pulsarSink.pulsarSinkProcessor;
             if (topic != null) {
-                
Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers
-                        .containsKey(String.format("%s-%s-id-1", topic, 
topic)));
+                Assert.assertTrue(producerCache
+                        .containsKey(SINK_RECORD_CACHE, topic, 
String.format("%s-id-1", topic)));
             } else {
-                
Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers
-                        .containsKey(String.format("%s-%s-id-1", defaultTopic, 
defaultTopic)));
+                Assert.assertTrue(producerCache
+                        .containsKey(SINK_RECORD_CACHE,
+                                defaultTopic, String.format("%s-id-1", 
defaultTopic)
+                        ));
             }
 
-            verify(pulsarClient.newProducer(), times(1)).topic(argThat(o -> {
-                return getTopicEquals(o, topic, defaultTopic);
-            }));
-            verify(pulsarClient.newProducer(), 
times(1)).producerName(argThat(o -> {
-                if (topic != null) {
-                    return String.format("%s-id-1", topic).equals(o);
-                } else {
-                    return String.format("%s-id-1", defaultTopic).equals(o);
-                }
-            }));
+            String expectedTopicName = topic != null ? topic : defaultTopic;
+            verify(pulsarClient.newProducer(), 
times(1)).topic(expectedTopicName);
+            String expectedProducerName = String.format("%s-id-1", 
expectedTopicName);
+            verify(pulsarClient.newProducer(), 
times(1)).producerName(expectedProducerName);
         }
     }
 
@@ -566,7 +561,7 @@ public class PulsarSinkTest {
         PulsarClient client = getPulsarClient();
         PulsarSink pulsarSink = new PulsarSink(
             client, sinkConfig, new HashMap<>(), 
mock(ComponentStatsManager.class),
-            Thread.currentThread().getContextClassLoader());
+            Thread.currentThread().getContextClassLoader(), producerCache);
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -578,7 +573,7 @@ public class PulsarSinkTest {
             assertTrue(pulsarSink.pulsarSinkProcessor instanceof 
PulsarSink.PulsarSinkEffectivelyOnceProcessor);
         }
         PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase) 
pulsarSink.pulsarSinkProcessor;
-        assertFalse(processor.publishProducers.containsKey(defaultTopic));
+        assertFalse(producerCache.containsKey(SINK_RECORD_CACHE, 
defaultTopic));
 
         String[] topics = {"topic-1", "topic-2", "topic-3"};
         for (String topic : topics) {
@@ -625,17 +620,15 @@ public class PulsarSinkTest {
             pulsarSink.write(record);
 
             if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) {
-                
assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1", 
topic, topic)));
+                assertTrue(producerCache.containsKey(SINK_RECORD_CACHE,
+                        topic, String.format("%s-id-1", topic)
+                ));
             } else {
-                assertTrue(processor.publishProducers.containsKey(topic));
+                assertTrue(producerCache.containsKey(SINK_RECORD_CACHE, 
topic));
             }
-            verify(client.newProducer(), times(1))
-                .topic(argThat(
-                    otherTopic -> topic != null ? topic.equals(otherTopic) : 
defaultTopic.equals(otherTopic)));
-
-            verify(client, times(1))
-                .newProducer(argThat(
-                    otherSchema -> Objects.equals(otherSchema, schema)));
+            String expectedTopicName = topic != null ? topic : defaultTopic;
+            verify(client.newProducer(), times(1)).topic(expectedTopicName);
+            verify(client, times(1)).newProducer(schema);
         }
     }
 
@@ -646,13 +639,4 @@ public class PulsarSinkTest {
             return Optional.empty();
         }
     }
-
-    private boolean getTopicEquals(Object o, String topic, String 
defaultTopic) {
-        if (topic != null) {
-            return topic.equals(o);
-        } else {
-            return defaultTopic.equals(o);
-        }
-    }
-
 }

Reply via email to