This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 07cebb1 On publish failures, log error and count them as sys exceptions (#3704) 07cebb1 is described below commit 07cebb17f5df0bc3de70acacc5981271396129a9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon Mar 4 22:12:56 2019 -0800 On publish failures, log error and count them as sys exceptions (#3704) * On publish failures, log error and count them as sys exceptions * Took feedback --- .../org/apache/pulsar/functions/instance/ContextImpl.java | 13 +++++++++++-- .../pulsar/functions/instance/JavaInstanceRunnable.java | 2 +- .../apache/pulsar/functions/instance/ContextImplTest.java | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 7271c87..60b8ec0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -89,6 +89,8 @@ class ContextImpl implements Context, SinkContext, SourceContext { private StateContextImpl stateContext; private Map<String, Object> userConfigs; + private ComponentStatsManager statsManager; + Map<String, String[]> userMetricsLabels = new HashMap<>(); private final String[] metricsLabels; private final Summary userMetricsSummary; @@ -103,12 +105,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, - Utils.ComponentType componentType) { + Utils.ComponentType componentType, ComponentStatsManager statsManager) { this.config = config; this.logger = logger; this.publishProducers = new HashMap<>(); this.inputTopics = inputTopics; this.topicSchema = new TopicSchema(client); + this.statsManager = statsManager; this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); @@ -359,7 +362,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { } } - return producer.sendAsync(object).thenApply(msgId -> null); + CompletableFuture<Void> future = producer.sendAsync(object).thenApply(msgId -> null); + future.exceptionally(e -> { + this.statsManager.incrSysExceptions(e); + logger.error("Failed to publish to topic {} with error {}", topicName, e); + return null; + }); + return future; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 19a0b2a..c9bf644 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -215,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider, - collectorRegistry, metricsLabels, this.componentType); + collectorRegistry, metricsLabels, this.componentType, this.stats); } /** diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 47c3539..7523fb1 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -79,7 +79,7 @@ public class ContextImplTest { client, new ArrayList<>(), new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0], - Utils.ComponentType.FUNCTION); + Utils.ComponentType.FUNCTION, null); } @Test(expectedExceptions = IllegalStateException.class)