sijie closed pull request #2738: [functions][worker] timeout creating producer 
for worker
URL: https://github.com/apache/pulsar/pull/2738
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 69725adce4..24217cfbd6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,19 +18,19 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import static 
org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
-
+import com.google.common.base.Stopwatch;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -88,21 +88,48 @@ public SchedulerManager(WorkerConfig workerConfig, 
PulsarClient pulsarClient, Pu
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
 
-        try {
-            this.producer = 
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
-                    
.enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
-                    sendTimeout(0, TimeUnit.MILLISECONDS).create();
-        } catch (PulsarClientException e) {
-            log.error("Failed to create producer to function assignment topic "
-                    + this.workerConfig.getFunctionAssignmentTopic(), e);
-            throw new RuntimeException(e);
-        }
-
+        this.producer = createProducer(pulsarClient, workerConfig);
         this.executorService = executor;
         
         scheduleCompaction(executor, 
workerConfig.getTopicCompactionFrequencySec());
     }
 
+    private static Producer<byte[]> createProducer(PulsarClient client, 
WorkerConfig config) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        for (int i = 0; i < 6; i++) {
+            try {
+                return 
client.newProducer().topic(config.getFunctionAssignmentTopic())
+                    .enableBatching(false)
+                    .blockIfQueueFull(true)
+                    .compressionType(CompressionType.LZ4)
+                    .sendTimeout(0, TimeUnit.MILLISECONDS)
+                    .createAsync().get(10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                log.error("Interrupted at creating producer to topic {}", 
config.getFunctionAssignmentTopic(), e);
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+                log.error("Encountered exceptions at creating producer for 
topic {}",
+                    config.getFunctionAssignmentTopic(), e);
+                throw new RuntimeException(e);
+            } catch (TimeoutException e) {
+                try {
+                    log.info("Can't create a producer on assignment topic {} 
in {} seconds, retry in 10 seconds ...",
+                        stopwatch.elapsed(TimeUnit.SECONDS));
+                    TimeUnit.SECONDS.sleep(10);
+                } catch (InterruptedException e1) {
+                    log.error("Interrupted at creating producer to topic {}", 
config.getFunctionAssignmentTopic(), e);
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+                continue;
+            }
+        }
+        throw new RuntimeException("Can't create a producer on assignment 
topic "
+            + config.getFunctionAssignmentTopic() + " in " + 
stopwatch.elapsed(TimeUnit.SECONDS)
+            + " seconds, fail fast ...");
+    }
+
     public Future<?> schedule() {
         return executorService.submit(() -> {
             synchronized (SchedulerManager.this) {
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index d3e0118bb8..6dde08c69a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -53,7 +53,6 @@
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Request;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
@@ -81,18 +80,6 @@
     private TypedMessageBuilder<byte[]> message;
     private ScheduledExecutorService executor;
 
-    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
-        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
-        when(builder.topic(anyString())).thenReturn(builder);
-
-        when(builder.create()).thenReturn(mock(Producer.class));
-
-        PulsarClient client = mock(PulsarClient.class);
-        when(client.newProducer()).thenReturn(builder);
-
-        return client;
-    }
-
     @BeforeMethod
     public void setup() throws PulsarClientException {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -121,7 +108,7 @@ public void setup() throws PulsarClientException {
         
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
         when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
 
-        when(builder.create()).thenReturn(producer);
+        
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
 
         PulsarClient pulsarClient = mock(PulsarClient.class);
         when(pulsarClient.newProducer()).thenReturn(builder);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to