This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03b06a6  report sys errors when failed to produce successfully (#3668)
03b06a6 is described below

commit 03b06a661ddc6932ad4de6c80a0157ae3681d576
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sat Feb 23 19:09:29 2019 -0800

    report sys errors when failed to produce successfully (#3668)
    
    * report sys errors when failed to produce sucessfully
    
    * enhancing error message
    
    * adding metrics to context publish
    
    * enhancing error logging and adding java error handling
    
    * fixing tests
    
    * cleaning up
    
    * addressing comments and fix bug
    
    * cleaning up
    
    * fix bug
    
    * fix test
    
    * check is none
    
    * fix unit test
    
    * cleaning up
---
 .../functions/instance/JavaInstanceRunnable.java   |  2 +-
 .../instance/stats/ComponentStatsManager.java      |  6 +-
 .../instance/stats/FunctionStatsManager.java       |  8 +--
 .../functions/instance/stats/SinkStatsManager.java |  6 +-
 .../instance/stats/SourceStatsManager.java         |  6 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   | 48 ++++++++++++---
 .../instance/src/main/python/contextimpl.py        | 19 +++++-
 .../instance/src/main/python/python_instance.py    | 17 ++++--
 .../pulsar/functions/sink/PulsarSinkTest.java      | 71 ++++++++++++----------
 .../src/test/python/test_python_instance.py        | 19 +++++-
 10 files changed, 134 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index cd6a94f..19a0b2a 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -707,7 +707,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
                 pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
 
-                object = new PulsarSink(this.client, pulsarSinkConfig, 
this.properties);
+                object = new PulsarSink(this.client, pulsarSinkConfig, 
this.properties, this.stats);
             }
         } else {
             object = Reflections.createInstance(
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 7c81018..38a5de6 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -99,11 +99,11 @@ public abstract class ComponentStatsManager implements 
AutoCloseable {
 
     public abstract void incrSysExceptions(Throwable sysException);
 
-    public abstract void incrUserExceptions(Exception userException);
+    public abstract void incrUserExceptions(Throwable userException);
 
-    public abstract void incrSourceExceptions(Exception userException);
+    public abstract void incrSourceExceptions(Throwable userException);
 
-    public abstract void incrSinkExceptions(Exception userException);
+    public abstract void incrSinkExceptions(Throwable userException);
 
     public abstract void setLastInvocation(long ts);
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index d03bfe2..e4e03f8 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -236,7 +236,7 @@ public class FunctionStatsManager extends 
ComponentStatsManager{
         sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 
1, TimeUnit.MINUTES);
     }
 
-    public void addUserException(Exception ex) {
+    public void addUserException(Throwable ex) {
         long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
         latestUserExceptions.add(info);
@@ -284,19 +284,19 @@ public class FunctionStatsManager extends 
ComponentStatsManager{
     }
 
     @Override
-    public void incrUserExceptions(Exception userException) {
+    public void incrUserExceptions(Throwable userException) {
         _statTotalUserExceptions.inc();
         _statTotalUserExceptions1min.inc();
         addUserException(userException);
     }
 
     @Override
-    public void incrSourceExceptions(Exception ex) {
+    public void incrSourceExceptions(Throwable ex) {
         incrSysExceptions(ex);
     }
 
     @Override
-    public void incrSinkExceptions(Exception ex) {
+    public void incrSinkExceptions(Throwable ex) {
         incrSysExceptions(ex);
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 14e4c21..1a23957 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -224,17 +224,17 @@ public class SinkStatsManager extends 
ComponentStatsManager {
     }
 
     @Override
-    public void incrUserExceptions(Exception ex) {
+    public void incrUserExceptions(Throwable ex) {
         incrSysExceptions(ex);
     }
 
     @Override
-    public void incrSourceExceptions(Exception ex) {
+    public void incrSourceExceptions(Throwable ex) {
         incrSysExceptions(ex);
     }
 
     @Override
-    public void incrSinkExceptions(Exception ex) {
+    public void incrSinkExceptions(Throwable ex) {
         long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
         latestSinkExceptions.add(info);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 5679f2e..87a60a3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -223,12 +223,12 @@ public class SourceStatsManager extends 
ComponentStatsManager {
     }
 
     @Override
-    public void incrUserExceptions(Exception ex) {
+    public void incrUserExceptions(Throwable ex) {
         incrSysExceptions(ex);
     }
 
     @Override
-    public void incrSourceExceptions(Exception ex) {
+    public void incrSourceExceptions(Throwable ex) {
         long ts = System.currentTimeMillis();
         InstanceCommunication.FunctionStatus.ExceptionInformation info = 
getExceptionInfo(ex, ts);
         latestSourceExceptions.add(info);
@@ -243,7 +243,7 @@ public class SourceStatsManager extends 
ComponentStatsManager {
     }
 
     @Override
-    public void incrSinkExceptions(Exception ex) {
+    public void incrSinkExceptions(Throwable ex) {
         incrSysExceptions(ex);
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 717ac10..8c32608 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -30,15 +31,14 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
-import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
@@ -50,6 +50,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 @Slf4j
 public class PulsarSink<T> implements Sink<T> {
@@ -57,6 +58,7 @@ public class PulsarSink<T> implements Sink<T> {
     private final PulsarClient client;
     private final PulsarSinkConfig pulsarSinkConfig;
     private final Map<String, String> properties;
+    private ComponentStatsManager stats;
 
     @VisibleForTesting
     PulsarSinkProcessor<T> pulsarSinkProcessor;
@@ -133,6 +135,27 @@ public class PulsarSink<T> implements Sink<T> {
                 log.warn("Failed to close all the producers", e);
             }
         }
+
+        public Function<Throwable, Void> getPublishErrorHandler(Record<T> 
record) {
+
+            return throwable -> {
+                SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+                Record<T> srcRecord = sinkRecord.getSourceRecord();
+
+                String topic = 
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());
+
+                String errorMsg = null;
+                if (srcRecord instanceof PulsarRecord) {
+                    errorMsg = String.format("Failed to publish to topic [%s] 
with error [%s] with src message id [%s]", topic, throwable.getMessage(), 
((PulsarRecord) srcRecord).getMessageId());
+                    log.error(errorMsg);
+                } else {
+                    errorMsg = String.format("Failed to publish to topic [%s] 
with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), 
record.getRecordSequence().get());
+                    log.error(errorMsg);
+                }
+                stats.incrSinkExceptions(new Exception(errorMsg));
+                return null;
+            };
+        }
     }
 
     @VisibleForTesting
@@ -155,7 +178,9 @@ public class PulsarSink<T> implements Sink<T> {
 
         @Override
         public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) throws Exception {
-            msg.sendAsync();
+            msg.sendAsync().thenAccept(messageId -> {
+                //no op
+            }).exceptionally(getPublishErrorHandler(record));
         }
     }
 
@@ -167,14 +192,15 @@ public class PulsarSink<T> implements Sink<T> {
 
         @Override
         public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) throws Exception {
-            msg.sendAsync().thenAccept(messageId -> record.ack());
+            msg.sendAsync()
+                    .thenAccept(messageId -> record.ack())
+                    .exceptionally(getPublishErrorHandler(record));
         }
     }
 
     @VisibleForTesting
     class PulsarSinkEffectivelyOnceProcessor extends PulsarSinkProcessorBase {
 
-
         public PulsarSinkEffectivelyOnceProcessor(Schema schema) {
             super(schema);
         }
@@ -202,17 +228,19 @@ public class PulsarSink<T> implements Sink<T> {
 
             // assign sequence id to output message for idempotent producing
             msg.sequenceId(record.getRecordSequence().get());
-            msg.sendAsync()
-                    .thenAccept(messageId -> record.ack())
-                    .join();
+            CompletableFuture<MessageId> future = msg.sendAsync();
+
+            future.thenAccept(messageId -> 
record.ack()).exceptionally(getPublishErrorHandler(record));
+            future.join();
         }
     }
 
-    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, 
Map<String, String> properties) {
+    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, 
Map<String, String> properties, ComponentStatsManager stats) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
         this.topicSchema = new TopicSchema(client);
         this.properties = properties;
+        this.stats = stats;
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index e1517b5..638e64f 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -26,19 +26,24 @@
 import time
 import os
 import json
+import log
 
 import pulsar
 import util
 
 from prometheus_client import Summary
 from function_stats import Stats
+from functools import partial
+
+Log = log.Log
 
 class ContextImpl(pulsar.Context):
 
   # add label to indicate user metric
   user_metrics_label_names = Stats.metrics_label_names + ["metric"]
 
-  def __init__(self, instance_config, logger, pulsar_client, user_code, 
consumers, secrets_provider, metrics_labels, state_context):
+  def __init__(self, instance_config, logger, pulsar_client, user_code, 
consumers,
+               secrets_provider, metrics_labels, state_context, stats):
     self.instance_config = instance_config
     self.log = logger
     self.pulsar_client = pulsar_client
@@ -63,6 +68,7 @@ class ContextImpl(pulsar.Context):
     self.user_metrics_summary = Summary("pulsar_function_user_metric",
                                     'Pulsar Function user defined metric',
                                         ContextImpl.user_metrics_label_names)
+    self.stats = stats
 
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, message, topic):
@@ -127,13 +133,20 @@ class ContextImpl(pulsar.Context):
 
     self.user_metrics_map[metric_name].observe(metric_value)
 
-
   def get_output_topic(self):
     return self.instance_config.function_details.output
 
   def get_output_serde_class_name(self):
     return self.instance_config.function_details.outputSerdeClassName
 
+  def callback_wrapper(self, callback, topic, message_id, result, msg):
+    if result != pulsar.Result.Ok:
+      error_msg = "Failed to publish to topic [%s] with error [%s] with src 
message id [%s]" % (topic, result, message_id)
+      Log.error(error_msg)
+      self.stats.incr_total_sys_exceptions(Exception(error_msg))
+    if callback:
+      callback(result, msg)
+
   def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, 
callback=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
@@ -160,7 +173,7 @@ class ContextImpl(pulsar.Context):
       self.publish_serializers[serde_class_name] = serde_klass()
 
     output_bytes = 
bytes(self.publish_serializers[serde_class_name].serialize(message))
-    self.publish_producers[topic_name].send_async(output_bytes, callback, 
properties=properties)
+    self.publish_producers[topic_name].send_async(output_bytes, 
partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), 
properties=properties)
 
   def ack(self, msgid, topic):
     if topic not in self.consumers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index b018e6d..8f740f1 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -201,7 +201,8 @@ class PythonInstance(object):
 
     self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, 
self.pulsar_client,
                                                self.user_code, self.consumers,
-                                               self.secrets_provider, 
self.metrics_labels, self.state_context)
+                                               self.secrets_provider, 
self.metrics_labels,
+                                               self.state_context, self.stats)
     # Now launch a thread that does execution
     self.execution_thread = threading.Thread(target=self.actual_execution)
     self.execution_thread.start()
@@ -260,9 +261,15 @@ class PythonInstance(object):
         Log.error("Uncaught exception in Python instance: %s" % e);
         self.stats.incr_total_sys_exceptions(e)
 
-  def done_producing(self, consumer, orig_message, result, sent_message):
-    if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
-      consumer.acknowledge(orig_message)
+  def done_producing(self, consumer, orig_message, topic, result, 
sent_message):
+    if result == pulsar.Result.Ok:
+      if self.auto_ack:
+        consumer.acknowledge(orig_message)
+    else:
+      error_msg = "Failed to publish to topic [%s] with error [%s] with src 
message id [%s]" % (topic, result, orig_message.message_id())
+      Log.error(error_msg)
+      self.stats.incr_total_sys_exceptions(Exception(error_msg))
+
 
   def process_result(self, output, msg):
     if output is not None and self.instance_config.function_details.sink.topic 
!= None and \
@@ -277,7 +284,7 @@ class PythonInstance(object):
 
       if output_bytes is not None:
         props = {"__pfn_input_topic__" : str(msg.topic), 
"__pfn_input_msg_id__" : base64ify(msg.message.message_id().serialize())}
-        self.producer.send_async(output_bytes, partial(self.done_producing, 
msg.consumer, msg.message), properties=props)
+        self.producer.send_async(output_bytes, partial(self.done_producing, 
msg.consumer, msg.message, self.producer.topic()), properties=props)
     elif self.auto_ack and self.atleast_once:
       msg.consumer.acknowledge(msg.message)
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 53cfeb6..caf5dec 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -18,6 +18,36 @@
  */
 package org.apache.pulsar.functions.sink;
 
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
+import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.ArgumentMatcher;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
@@ -33,29 +63,6 @@ import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.client.api.*;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.SinkRecord;
-import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.SinkContext;
-import org.mockito.ArgumentMatcher;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
 @Slf4j
 public class PulsarSinkTest {
 
@@ -160,7 +167,7 @@ public class PulsarSinkTest {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
 
         try {
             Schema schema = pulsarSink.initializeSchema();
@@ -178,7 +185,7 @@ public class PulsarSinkTest {
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
         try {
             pulsarSink.initializeSchema();
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
@@ -200,7 +207,7 @@ public class PulsarSinkTest {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
 
         try {
             pulsarSink.initializeSchema();
@@ -219,7 +226,7 @@ public class PulsarSinkTest {
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
         pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
 
         try {
             pulsarSink.initializeSchema();
@@ -235,7 +242,7 @@ public class PulsarSinkTest {
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
         pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), 
pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
 
         try {
             pulsarSink.initializeSchema();
@@ -245,8 +252,6 @@ public class PulsarSinkTest {
         }
     }
 
-
-
     @Test
     public void testSinkAndMessageRouting() throws Exception {
 
@@ -259,7 +264,7 @@ public class PulsarSinkTest {
         /** test At-least-once **/
         pulsarClient = getPulsarClient();
         
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>());
+        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class));
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -316,7 +321,7 @@ public class PulsarSinkTest {
         /** test At-most-once **/
         pulsarClient = getPulsarClient();
         
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>());
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class));
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -373,7 +378,7 @@ public class PulsarSinkTest {
         /** test Effectively-once **/
         pulsarClient = getPulsarClient();
         
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>());
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new 
HashMap<>(), mock(ComponentStatsManager.class));
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py 
b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 9c90597..0071e2f 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -23,6 +23,7 @@
 from contextimpl import ContextImpl
 from python_instance import InstanceConfig
 from mock import Mock
+from pulsar import Message
 
 import Function_pb2
 import log
@@ -31,6 +32,12 @@ import unittest
 
 class TestContextImpl(unittest.TestCase):
 
+  def Any(cls):
+    class Any(cls):
+      def __eq__(self, other):
+        return True
+    return Any()
+
   def setUp(self):
     log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + 
"/conf/functions-logging/console_logging_config.ini")
 
@@ -48,11 +55,17 @@ class TestContextImpl(unittest.TestCase):
     pulsar_client.create_producer = Mock(return_value=producer)
     user_code=__file__
     consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None, None, None)
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None, None, None, None)
 
-    context_impl.publish("test_topic_name", "test_message")
+    msg = Message()
+    msg.message_id = Mock(return_value="test_message_id")
+    context_impl.set_current_message_context(msg, "test_topic_name")
 
-    producer.send_async.assert_called_with("test_message", None, 
properties=None)
+    context_impl.publish("test_topic_name", "test_message")
 
+    args, kwargs = producer.send_async.call_args
+    self.assertEqual(args[0].decode("utf-8"), "test_message")
+    self.assertEqual(args[1].args[1], "test_topic_name")
+    self.assertEqual(args[1].args[2], "test_message_id")
 
 

Reply via email to