This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5c756ee Optimize batch source discovery and task ack (#8498) 5c756ee is described below commit 5c756ee1b8c7a4360520bde2c3f5596826d70fc6 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Tue Nov 10 14:14:54 2020 -0800 Optimize batch source discovery and task ack (#8498) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../functions/instance/JavaInstanceRunnable.java | 6 +- .../source/batch/BatchSourceExecutor.java | 99 +++++++++++++---- .../source/batch/BatchSourceExecutorTest.java | 120 +++++++++++++-------- 3 files changed, 154 insertions(+), 71 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 2b2f360..6b27868 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -460,7 +460,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } } - private Record readInput() { + private Record readInput() throws Exception { Record record; if (!(this.source instanceof PulsarSource)) { Thread.currentThread().setContextClassLoader(functionClassLoader); @@ -469,8 +469,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { record = this.source.read(); } catch (Exception e) { stats.incrSourceExceptions(e); - log.info("Encountered exception in source read: ", e); - throw new RuntimeException(e); + log.error("Encountered exception in source read", e); + throw e; } finally { Thread.currentThread().setContextClassLoader(instanceClassLoader); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java index 4474b9b..5937616 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.source.batch; import com.google.gson.Gson; +import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -41,6 +42,9 @@ import org.apache.pulsar.io.core.SourceContext; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * BatchSourceExecutor wraps BatchSource as Source. Thus from Pulsar IO perspective, it is running a regular @@ -63,13 +67,16 @@ public class BatchSourceExecutor<T> implements Source<T> { private String batchSourceClassName; private BatchSource<T> batchSource; private String intermediateTopicName; + private volatile Exception currentError = null; + private volatile boolean isRunning = false; + private ExecutorService discoveryThread = Executors.newSingleThreadExecutor(new DefaultThreadFactory("batch-source-discovery")); @Override public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { this.config = config; this.sourceContext = sourceContext; this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(sourceContext.getTenant(), - sourceContext.getNamespace(), sourceContext.getSourceName()).toString(); + sourceContext.getNamespace(), sourceContext.getSourceName()).toString(); this.getBatchSourceConfigs(config); this.initializeBatchSource(); this.start(); @@ -78,14 +85,21 @@ public class BatchSourceExecutor<T> implements Source<T> { @Override public Record<T> read() throws Exception { while (true) { + if (currentError != null) { + throw currentError; + } if (currentTask == null) { - retrieveNextTask(); - prepareInternal(); + currentTask = retrieveNextTask(); + prepareInternal(currentTask); } Record<T> retval = batchSource.readNext(); if (retval == null) { // signals end if this batch - intermediateTopicConsumer.acknowledge(currentTask.getMessageId()); + intermediateTopicConsumer.acknowledgeAsync(currentTask.getMessageId()).exceptionally(throwable -> { + log.error("Encountered error when acknowledging completed task with id {}", currentTask.getMessageId(), throwable); + setCurrentError(throwable); + return null; + }); currentTask = null; } else { return retval; @@ -95,7 +109,7 @@ public class BatchSourceExecutor<T> implements Source<T> { private void getBatchSourceConfigs(Map<String, Object> config) { if (!config.containsKey(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY) - || !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) { + || !config.containsKey(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY)) { throw new IllegalArgumentException("Batch Configs cannot be found"); } @@ -108,18 +122,18 @@ public class BatchSourceExecutor<T> implements Source<T> { // First init the batchsource ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); Object userClassObject = Reflections.createInstance( - batchSourceClassName, - clsLoader); + batchSourceClassName, + clsLoader); if (userClassObject instanceof BatchSource) { - batchSource = (BatchSource) userClassObject; + batchSource = (BatchSource) userClassObject; } else { throw new IllegalArgumentException("BatchSource does not implement the correct interface"); } // next init the discovery triggerer Object discoveryClassObject = Reflections.createInstance( - batchSourceConfig.getDiscoveryTriggererClassName(), - clsLoader); + batchSourceConfig.getDiscoveryTriggererClassName(), + clsLoader); if (discoveryClassObject instanceof BatchSourceTriggerer) { discoveryTriggerer = (BatchSourceTriggerer) discoveryClassObject; } else { @@ -128,22 +142,38 @@ public class BatchSourceExecutor<T> implements Source<T> { } private void start() throws Exception { + isRunning = true; createIntermediateTopicConsumer(); batchSource.open(this.config, this.sourceContext); if (sourceContext.getInstanceId() == 0) { discoveryTriggerer.init(batchSourceConfig.getDiscoveryTriggererConfig(), - this.sourceContext); + this.sourceContext); discoveryTriggerer.start(this::triggerDiscover); } } - private void triggerDiscover(String discoveredEvent) { - try { - batchSource.discover((task) -> this.taskEater(discoveredEvent, task)); - } catch (Exception e) { - log.error("Error on discover", e); - throw new RuntimeException(e); + volatile boolean discoverInProgress = false; + private synchronized void triggerDiscover(String discoveredEvent) { + + if (discoverInProgress) { + log.info("Discovery is already in progress"); + return; + } else { + discoverInProgress = true; } + // Run this code asynchronous so it doesn't block processing of the tasks + discoveryThread.submit(() -> { + try { + batchSource.discover(task -> taskEater(discoveredEvent, task)); + } catch (Exception e) { + if (isRunning || !(e instanceof InterruptedException)) { + log.error("Encountered error during task discovery", e); + setCurrentError(e); + } + } finally { + discoverInProgress = false; + } + }); } private void taskEater(String discoveredEvent, byte[] task) { @@ -153,6 +183,7 @@ public class BatchSourceExecutor<T> implements Source<T> { properties.put("produceTime", String.valueOf(System.currentTimeMillis())); TypedMessageBuilder<byte[]> message = sourceContext.newOutputMessage(intermediateTopicName, Schema.BYTES); message.value(task).properties(properties); + // Note: we can only make this send async if the api returns a future to the connector so that errors can be handled by the connector message.send(); } catch (Exception e) { log.error("error writing discovered task to intermediate topic", e); @@ -160,9 +191,9 @@ public class BatchSourceExecutor<T> implements Source<T> { } } - private void prepareInternal() { + private void prepareInternal(Message<byte[]> task) { try { - batchSource.prepare(currentTask.getValue()); + batchSource.prepare(task.getValue()); } catch (Exception e) { log.error("Error on prepare", e); throw new RuntimeException(e); @@ -175,6 +206,7 @@ public class BatchSourceExecutor<T> implements Source<T> { } private void stop() throws Exception { + isRunning = false; Exception ex = null; if (discoveryTriggerer != null) { try { @@ -185,6 +217,14 @@ public class BatchSourceExecutor<T> implements Source<T> { } discoveryTriggerer = null; } + + discoveryThread.shutdownNow(); + try { + discoveryThread.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("Shutdown of discovery thread was interrupted"); + } + if (intermediateTopicConsumer != null) { try { intermediateTopicConsumer.close(); @@ -255,11 +295,24 @@ public class BatchSourceExecutor<T> implements Source<T> { } } - - private void retrieveNextTask() throws Exception { - currentTask = intermediateTopicConsumer.receive(); - return; + private Message<byte[]> retrieveNextTask() throws Exception { + while(true) { + if (currentError != null) { + throw currentError; + } + Message<byte[]> taskMessage = intermediateTopicConsumer.receive(5, TimeUnit.SECONDS); + if (taskMessage != null) { + return taskMessage; + } + } } + private void setCurrentError(Throwable error) { + if (error instanceof Exception) { + currentError = (Exception) error; + } else { + currentError = new RuntimeException(error.getCause()); + } + } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java index 5125453..65a8928 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutorTest.java @@ -21,25 +21,28 @@ package org.apache.pulsar.functions.source.batch; import com.google.gson.Gson; import lombok.Getter; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.functions.api.Record; - import org.apache.pulsar.io.core.BatchPushSource; import org.apache.pulsar.io.core.BatchSource; import org.apache.pulsar.io.core.BatchSourceTriggerer; import org.apache.pulsar.io.core.SourceContext; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.*; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; /** @@ -49,13 +52,13 @@ public class BatchSourceExecutorTest { public static class TestBatchSource implements BatchSource<String> { @Getter - private static int prepareCount; + public static int prepareCount; @Getter - private static int discoverCount; + public static int discoverCount; @Getter - private static int recordCount; + public static int recordCount; @Getter - private static int closeCount; + public static int closeCount; private Record record = Mockito.mock(Record.class); public TestBatchSource() { } @@ -93,15 +96,22 @@ public class BatchSourceExecutorTest { } } + public static class TestBatchSourceFailDiscovery extends TestBatchSource { + @Override + public void discover(Consumer<byte[]> taskEater) throws Exception { + throw new Exception("test"); + } + } + public static class TestBatchPushSource extends BatchPushSource<String> { @Getter - private static int prepareCount; + public static int prepareCount; @Getter - private static int discoverCount; + public static int discoverCount; @Getter - private static int recordCount; + public static int recordCount; @Getter - private static int closeCount; + public static int closeCount; private Record record = Mockito.mock(Record.class); public TestBatchPushSource() { } @@ -135,7 +145,10 @@ public class BatchSourceExecutorTest { } } + public static LinkedBlockingQueue<String> triggerQueue = new LinkedBlockingQueue<>(); + public static LinkedBlockingQueue<String> completedQueue = new LinkedBlockingQueue<>(); public static class TestDiscoveryTriggerer implements BatchSourceTriggerer { + @Getter private Consumer<String> trigger; private Thread thread; @@ -150,12 +163,12 @@ public class BatchSourceExecutorTest { @Override public void start(Consumer<String> trigger) { + this.trigger = trigger; thread = new Thread(() -> { while(true) { try { - Thread.sleep(100); - trigger.accept("Triggered"); + trigger.accept(triggerQueue.take()); } catch (InterruptedException e) { break; } @@ -186,7 +199,6 @@ public class BatchSourceExecutorTest { private ConsumerBuilder consumerBuilder; private org.apache.pulsar.client.api.Consumer<byte[]> consumer; private TypedMessageBuilder<byte[]> messageBuilder; - private CyclicBarrier discoveryBarrier; private Message<byte[]> discoveredTask; private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) { @@ -208,6 +220,14 @@ public class BatchSourceExecutorTest { @BeforeMethod public void setUp() throws Exception { + TestBatchSource.closeCount = 0; + TestBatchSource.discoverCount = 0; + TestBatchSource.prepareCount = 0; + TestBatchSource.recordCount = 0; + TestBatchPushSource.closeCount = 0; + TestBatchPushSource.discoverCount = 0; + TestBatchPushSource.prepareCount = 0; + TestBatchPushSource.recordCount = 0; testBatchSource = new TestBatchSource(); testBatchPushSource = new TestBatchPushSource(); batchSourceExecutor = new BatchSourceExecutor<>(); @@ -225,9 +245,12 @@ public class BatchSourceExecutorTest { Mockito.doReturn(consumerBuilder).when(consumerBuilder).properties(Mockito.anyMap()); Mockito.doReturn(consumerBuilder).when(consumerBuilder).topic(Mockito.any()); discoveredTask = Mockito.mock(Message.class); + Mockito.doReturn(MessageId.latest).when(discoveredTask).getMessageId(); consumer = Mockito.mock(org.apache.pulsar.client.api.Consumer.class); Mockito.doReturn(discoveredTask).when(consumer).receive(); + Mockito.doReturn(discoveredTask).when(consumer).receive(Mockito.anyInt(), Mockito.any()); Mockito.doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync(); + Mockito.doReturn(CompletableFuture.completedFuture(null)).when(consumer).acknowledgeAsync(Mockito.any(MessageId.class)); Mockito.doReturn(consumerBuilder).when(context).newConsumerBuilder(Schema.BYTES); messageBuilder = Mockito.mock(TypedMessageBuilder.class); Mockito.doReturn(messageBuilder).when(messageBuilder).value(Mockito.any()); @@ -235,16 +258,13 @@ public class BatchSourceExecutorTest { Mockito.doReturn(messageBuilder).when(context).newOutputMessage(Mockito.anyString(), Mockito.any()); // Discovery - discoveryBarrier = new CyclicBarrier(2); - Mockito.doAnswer(new Answer<MessageId>() { - @Override public MessageId answer(InvocationOnMock invocation) { - try { - discoveryBarrier.await(); - } catch (Exception e) { - throw new RuntimeException(); - } - return null; + Mockito.doAnswer((Answer<MessageId>) invocation -> { + try { + completedQueue.put("done"); + } catch (Exception e) { + throw new RuntimeException(); } + return null; }).when(messageBuilder).send(); } @@ -333,43 +353,53 @@ public class BatchSourceExecutorTest { batchSourceExecutor.open(pushConfig, context); } - @Test + @Test (timeOut = 5000) public void testLifeCycle() throws Exception { batchSourceExecutor.open(config, context); - Assert.assertTrue(testBatchSource.getDiscoverCount() < 1); - discoveryBarrier.await(); - Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1); - Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2); + Assert.assertEquals(testBatchSource.getDiscoverCount(), 0); + triggerQueue.put("trigger"); + completedQueue.take(); + Assert.assertEquals(testBatchSource.getDiscoverCount(), 1); for (int i = 0; i < 5; ++i) { batchSourceExecutor.read(); } Assert.assertEquals(testBatchSource.getRecordCount(), 6); - Assert.assertTrue(testBatchSource.getDiscoverCount() >= 1); - Assert.assertTrue(testBatchSource.getDiscoverCount() <= 2); - discoveryBarrier.await(); - Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2); - Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3); + Assert.assertEquals(testBatchSource.getDiscoverCount(), 1); + triggerQueue.put("trigger"); + completedQueue.take(); + Assert.assertTrue(testBatchSource.getDiscoverCount() == 2); batchSourceExecutor.close(); Assert.assertEquals(testBatchSource.getCloseCount(), 1); } - @Test + @Test (timeOut = 5000) public void testPushLifeCycle() throws Exception { batchSourceExecutor.open(pushConfig, context); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1); - discoveryBarrier.await(); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2); + Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 0); + triggerQueue.put("trigger"); + completedQueue.take(); + Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1); for (int i = 0; i < 5; ++i) { batchSourceExecutor.read(); } Assert.assertEquals(testBatchPushSource.getRecordCount(), 5); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2); - discoveryBarrier.await(); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2); - Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3); + Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 1); + triggerQueue.put("trigger"); + completedQueue.take(); + Assert.assertEquals(testBatchPushSource.getDiscoverCount(), 2); batchSourceExecutor.close(); Assert.assertEquals(testBatchPushSource.getCloseCount(), 1); } + + + @Test(expectedExceptions = Exception.class, expectedExceptionsMessageRegExp = "test", timeOut = 1000) + public void testDiscoveryPhaseError() throws Exception { + config = createConfig(TestBatchSourceFailDiscovery.class.getName(), testBatchConfig); + batchSourceExecutor.open(config, context); + triggerQueue.put("trigger"); + while (true) { + batchSourceExecutor.read(); + Thread.sleep(100); + } + } } \ No newline at end of file