sijie closed pull request #2738: [functions][worker] timeout creating producer for worker URL: https://github.com/apache/pulsar/pull/2738
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 69725adce4..24217cfbd6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -18,19 +18,19 @@ */ package org.apache.pulsar.functions.worker; -import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction; - +import com.google.common.base.Stopwatch; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -88,21 +88,48 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Pu this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader()); - try { - this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic()) - .enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4). - sendTimeout(0, TimeUnit.MILLISECONDS).create(); - } catch (PulsarClientException e) { - log.error("Failed to create producer to function assignment topic " - + this.workerConfig.getFunctionAssignmentTopic(), e); - throw new RuntimeException(e); - } - + this.producer = createProducer(pulsarClient, workerConfig); this.executorService = executor; scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec()); } + private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) { + Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 6; i++) { + try { + return client.newProducer().topic(config.getFunctionAssignmentTopic()) + .enableBatching(false) + .blockIfQueueFull(true) + .compressionType(CompressionType.LZ4) + .sendTimeout(0, TimeUnit.MILLISECONDS) + .createAsync().get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.error("Encountered exceptions at creating producer for topic {}", + config.getFunctionAssignmentTopic(), e); + throw new RuntimeException(e); + } catch (TimeoutException e) { + try { + log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...", + stopwatch.elapsed(TimeUnit.SECONDS)); + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e1) { + log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + continue; + } + } + throw new RuntimeException("Can't create a producer on assignment topic " + + config.getFunctionAssignmentTopic() + " in " + stopwatch.elapsed(TimeUnit.SECONDS) + + " seconds, fail fast ..."); + } + public Future<?> schedule() { return executorService.submit(() -> { synchronized (SchedulerManager.this) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index d3e0118bb8..6dde08c69a 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; -import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; @@ -81,18 +80,6 @@ private TypedMessageBuilder<byte[]> message; private ScheduledExecutorService executor; - private static PulsarClient mockPulsarClient() throws PulsarClientException { - ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); - when(builder.topic(anyString())).thenReturn(builder); - - when(builder.create()).thenReturn(mock(Producer.class)); - - PulsarClient client = mock(PulsarClient.class); - when(client.newProducer()).thenReturn(builder); - - return client; - } - @BeforeMethod public void setup() throws PulsarClientException { WorkerConfig workerConfig = new WorkerConfig(); @@ -121,7 +108,7 @@ public void setup() throws PulsarClientException { when(builder.compressionType(any(CompressionType.class))).thenReturn(builder); when(builder.sendTimeout(anyInt(), any(TimeUnit.class))).thenReturn(builder); - when(builder.create()).thenReturn(producer); + when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer)); PulsarClient pulsarClient = mock(PulsarClient.class); when(pulsarClient.newProducer()).thenReturn(builder); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services