This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fc0ef90 [functions][worker] timeout creating producer for worker (#2738) fc0ef90 is described below commit fc0ef9012f8e5d18985c6f897d35b9b55f7b7479 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Fri Oct 5 16:31:51 2018 -0700 [functions][worker] timeout creating producer for worker (#2738) *Motivation* Sometime when we run worker service as part of broker, some pods can be hanging on creating producers to assignment topics. It is unknown whether is it a k8s problem or not. But in general, timeout to fail fast to allow k8s to reschedule the pods. *Changes* Add a timeout logic at creating producers. --- .../pulsar/functions/worker/SchedulerManager.java | 53 ++++++++++++++++------ .../functions/worker/SchedulerManagerTest.java | 15 +----- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 69725ad..24217cf 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -18,19 +18,19 @@ */ package org.apache.pulsar.functions.worker; -import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction; - +import com.google.common.base.Stopwatch; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -88,21 +88,48 @@ public class SchedulerManager implements AutoCloseable { this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader()); - try { - this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic()) - .enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4). - sendTimeout(0, TimeUnit.MILLISECONDS).create(); - } catch (PulsarClientException e) { - log.error("Failed to create producer to function assignment topic " - + this.workerConfig.getFunctionAssignmentTopic(), e); - throw new RuntimeException(e); - } - + this.producer = createProducer(pulsarClient, workerConfig); this.executorService = executor; scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec()); } + private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) { + Stopwatch stopwatch = Stopwatch.createStarted(); + for (int i = 0; i < 6; i++) { + try { + return client.newProducer().topic(config.getFunctionAssignmentTopic()) + .enableBatching(false) + .blockIfQueueFull(true) + .compressionType(CompressionType.LZ4) + .sendTimeout(0, TimeUnit.MILLISECONDS) + .createAsync().get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.error("Encountered exceptions at creating producer for topic {}", + config.getFunctionAssignmentTopic(), e); + throw new RuntimeException(e); + } catch (TimeoutException e) { + try { + log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...", + stopwatch.elapsed(TimeUnit.SECONDS)); + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e1) { + log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + continue; + } + } + throw new RuntimeException("Can't create a producer on assignment topic " + + config.getFunctionAssignmentTopic() + " in " + stopwatch.elapsed(TimeUnit.SECONDS) + + " seconds, fail fast ..."); + } + public Future<?> schedule() { return executorService.submit(() -> { synchronized (SchedulerManager.this) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index d3e0118..6dde08c 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; -import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; @@ -81,18 +80,6 @@ public class SchedulerManagerTest { private TypedMessageBuilder<byte[]> message; private ScheduledExecutorService executor; - private static PulsarClient mockPulsarClient() throws PulsarClientException { - ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class); - when(builder.topic(anyString())).thenReturn(builder); - - when(builder.create()).thenReturn(mock(Producer.class)); - - PulsarClient client = mock(PulsarClient.class); - when(client.newProducer()).thenReturn(builder); - - return client; - } - @BeforeMethod public void setup() throws PulsarClientException { WorkerConfig workerConfig = new WorkerConfig(); @@ -121,7 +108,7 @@ public class SchedulerManagerTest { when(builder.compressionType(any(CompressionType.class))).thenReturn(builder); when(builder.sendTimeout(anyInt(), any(TimeUnit.class))).thenReturn(builder); - when(builder.create()).thenReturn(producer); + when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer)); PulsarClient pulsarClient = mock(PulsarClient.class); when(pulsarClient.newProducer()).thenReturn(builder);