This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 07cebb1  On publish failures, log error and count them as sys 
exceptions (#3704)
07cebb1 is described below

commit 07cebb17f5df0bc3de70acacc5981271396129a9
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Mon Mar 4 22:12:56 2019 -0800

    On publish failures, log error and count them as sys exceptions (#3704)
    
    * On publish failures, log error and count them as sys exceptions
    
    * Took feedback
---
 .../org/apache/pulsar/functions/instance/ContextImpl.java   | 13 +++++++++++--
 .../pulsar/functions/instance/JavaInstanceRunnable.java     |  2 +-
 .../apache/pulsar/functions/instance/ContextImplTest.java   |  2 +-
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 7271c87..60b8ec0 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -89,6 +89,8 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
+    private ComponentStatsManager statsManager;
+
     Map<String, String[]> userMetricsLabels = new HashMap<>();
     private final String[] metricsLabels;
     private final Summary userMetricsSummary;
@@ -103,12 +105,13 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client, List<String> inputTopics,
                        SecretsProvider secretsProvider, CollectorRegistry 
collectorRegistry, String[] metricsLabels,
-                       Utils.ComponentType componentType) {
+                       Utils.ComponentType componentType, 
ComponentStatsManager statsManager) {
         this.config = config;
         this.logger = logger;
         this.publishProducers = new HashMap<>();
         this.inputTopics = inputTopics;
         this.topicSchema = new TopicSchema(client);
+        this.statsManager = statsManager;
 
         this.producerBuilder = (ProducerBuilderImpl<?>) 
client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
@@ -359,7 +362,13 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
             }
         }
 
-        return producer.sendAsync(object).thenApply(msgId -> null);
+        CompletableFuture<Void> future = 
producer.sendAsync(object).thenApply(msgId -> null);
+        future.exceptionally(e -> {
+            this.statsManager.incrSysExceptions(e);
+            logger.error("Failed to publish to topic {} with error {}", 
topicName, e);
+            return null;
+        });
+        return future;
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 19a0b2a..c9bf644 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -215,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client, 
inputTopics, secretsProvider,
-                collectorRegistry, metricsLabels, this.componentType);
+                collectorRegistry, metricsLabels, this.componentType, 
this.stats);
     }
 
     /**
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 47c3539..7523fb1 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -79,7 +79,7 @@ public class ContextImplTest {
             client,
             new ArrayList<>(),
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), 
new String[0],
-                Utils.ComponentType.FUNCTION);
+                Utils.ComponentType.FUNCTION, null);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)

Reply via email to