This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 03b06a6 report sys errors when failed to produce successfully (#3668) 03b06a6 is described below commit 03b06a661ddc6932ad4de6c80a0157ae3681d576 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Feb 23 19:09:29 2019 -0800 report sys errors when failed to produce successfully (#3668) * report sys errors when failed to produce sucessfully * enhancing error message * adding metrics to context publish * enhancing error logging and adding java error handling * fixing tests * cleaning up * addressing comments and fix bug * cleaning up * fix bug * fix test * check is none * fix unit test * cleaning up --- .../functions/instance/JavaInstanceRunnable.java | 2 +- .../instance/stats/ComponentStatsManager.java | 6 +- .../instance/stats/FunctionStatsManager.java | 8 +-- .../functions/instance/stats/SinkStatsManager.java | 6 +- .../instance/stats/SourceStatsManager.java | 6 +- .../apache/pulsar/functions/sink/PulsarSink.java | 48 ++++++++++++--- .../instance/src/main/python/contextimpl.py | 19 +++++- .../instance/src/main/python/python_instance.py | 17 ++++-- .../pulsar/functions/sink/PulsarSinkTest.java | 71 ++++++++++++---------- .../src/test/python/test_python_instance.py | 19 +++++- 10 files changed, 134 insertions(+), 68 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index cd6a94f..19a0b2a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -707,7 +707,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); - object = new PulsarSink(this.client, pulsarSinkConfig, this.properties); + object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats); } } else { object = Reflections.createInstance( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index 7c81018..38a5de6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -99,11 +99,11 @@ public abstract class ComponentStatsManager implements AutoCloseable { public abstract void incrSysExceptions(Throwable sysException); - public abstract void incrUserExceptions(Exception userException); + public abstract void incrUserExceptions(Throwable userException); - public abstract void incrSourceExceptions(Exception userException); + public abstract void incrSourceExceptions(Throwable userException); - public abstract void incrSinkExceptions(Exception userException); + public abstract void incrSinkExceptions(Throwable userException); public abstract void setLastInvocation(long ts); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index d03bfe2..e4e03f8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -236,7 +236,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } - public void addUserException(Exception ex) { + public void addUserException(Throwable ex) { long ts = System.currentTimeMillis(); InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); latestUserExceptions.add(info); @@ -284,19 +284,19 @@ public class FunctionStatsManager extends ComponentStatsManager{ } @Override - public void incrUserExceptions(Exception userException) { + public void incrUserExceptions(Throwable userException) { _statTotalUserExceptions.inc(); _statTotalUserExceptions1min.inc(); addUserException(userException); } @Override - public void incrSourceExceptions(Exception ex) { + public void incrSourceExceptions(Throwable ex) { incrSysExceptions(ex); } @Override - public void incrSinkExceptions(Exception ex) { + public void incrSinkExceptions(Throwable ex) { incrSysExceptions(ex); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 14e4c21..1a23957 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -224,17 +224,17 @@ public class SinkStatsManager extends ComponentStatsManager { } @Override - public void incrUserExceptions(Exception ex) { + public void incrUserExceptions(Throwable ex) { incrSysExceptions(ex); } @Override - public void incrSourceExceptions(Exception ex) { + public void incrSourceExceptions(Throwable ex) { incrSysExceptions(ex); } @Override - public void incrSinkExceptions(Exception ex) { + public void incrSinkExceptions(Throwable ex) { long ts = System.currentTimeMillis(); InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); latestSinkExceptions.add(info); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 5679f2e..87a60a3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -223,12 +223,12 @@ public class SourceStatsManager extends ComponentStatsManager { } @Override - public void incrUserExceptions(Exception ex) { + public void incrUserExceptions(Throwable ex) { incrSysExceptions(ex); } @Override - public void incrSourceExceptions(Exception ex) { + public void incrSourceExceptions(Throwable ex) { long ts = System.currentTimeMillis(); InstanceCommunication.FunctionStatus.ExceptionInformation info = getExceptionInfo(ex, ts); latestSourceExceptions.add(info); @@ -243,7 +243,7 @@ public class SourceStatsManager extends ComponentStatsManager { } @Override - public void incrSinkExceptions(Exception ex) { + public void incrSinkExceptions(Throwable ex) { incrSysExceptions(ex); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 717ac10..8c32608 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.HashingScheme; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -30,15 +31,14 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.FunctionResultRouter; -import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.instance.SinkRecord; +import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.TopicSchema; -import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; @@ -50,6 +50,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; @Slf4j public class PulsarSink<T> implements Sink<T> { @@ -57,6 +58,7 @@ public class PulsarSink<T> implements Sink<T> { private final PulsarClient client; private final PulsarSinkConfig pulsarSinkConfig; private final Map<String, String> properties; + private ComponentStatsManager stats; @VisibleForTesting PulsarSinkProcessor<T> pulsarSinkProcessor; @@ -133,6 +135,27 @@ public class PulsarSink<T> implements Sink<T> { log.warn("Failed to close all the producers", e); } } + + public Function<Throwable, Void> getPublishErrorHandler(Record<T> record) { + + return throwable -> { + SinkRecord<T> sinkRecord = (SinkRecord<T>) record; + Record<T> srcRecord = sinkRecord.getSourceRecord(); + + String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()); + + String errorMsg = null; + if (srcRecord instanceof PulsarRecord) { + errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId()); + log.error(errorMsg); + } else { + errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get()); + log.error(errorMsg); + } + stats.incrSinkExceptions(new Exception(errorMsg)); + return null; + }; + } } @VisibleForTesting @@ -155,7 +178,9 @@ public class PulsarSink<T> implements Sink<T> { @Override public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception { - msg.sendAsync(); + msg.sendAsync().thenAccept(messageId -> { + //no op + }).exceptionally(getPublishErrorHandler(record)); } } @@ -167,14 +192,15 @@ public class PulsarSink<T> implements Sink<T> { @Override public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception { - msg.sendAsync().thenAccept(messageId -> record.ack()); + msg.sendAsync() + .thenAccept(messageId -> record.ack()) + .exceptionally(getPublishErrorHandler(record)); } } @VisibleForTesting class PulsarSinkEffectivelyOnceProcessor extends PulsarSinkProcessorBase { - public PulsarSinkEffectivelyOnceProcessor(Schema schema) { super(schema); } @@ -202,17 +228,19 @@ public class PulsarSink<T> implements Sink<T> { // assign sequence id to output message for idempotent producing msg.sequenceId(record.getRecordSequence().get()); - msg.sendAsync() - .thenAccept(messageId -> record.ack()) - .join(); + CompletableFuture<MessageId> future = msg.sendAsync(); + + future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record)); + future.join(); } } - public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties) { + public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats) { this.client = client; this.pulsarSinkConfig = pulsarSinkConfig; this.topicSchema = new TopicSchema(client); this.properties = properties; + this.stats = stats; } @Override diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index e1517b5..638e64f 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -26,19 +26,24 @@ import time import os import json +import log import pulsar import util from prometheus_client import Summary from function_stats import Stats +from functools import partial + +Log = log.Log class ContextImpl(pulsar.Context): # add label to indicate user metric user_metrics_label_names = Stats.metrics_label_names + ["metric"] - def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider, metrics_labels, state_context): + def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, + secrets_provider, metrics_labels, state_context, stats): self.instance_config = instance_config self.log = logger self.pulsar_client = pulsar_client @@ -63,6 +68,7 @@ class ContextImpl(pulsar.Context): self.user_metrics_summary = Summary("pulsar_function_user_metric", 'Pulsar Function user defined metric', ContextImpl.user_metrics_label_names) + self.stats = stats # Called on a per message basis to set the context for the current message def set_current_message_context(self, message, topic): @@ -127,13 +133,20 @@ class ContextImpl(pulsar.Context): self.user_metrics_map[metric_name].observe(metric_value) - def get_output_topic(self): return self.instance_config.function_details.output def get_output_serde_class_name(self): return self.instance_config.function_details.outputSerdeClassName + def callback_wrapper(self, callback, topic, message_id, result, msg): + if result != pulsar.Result.Ok: + error_msg = "Failed to publish to topic [%s] with error [%s] with src message id [%s]" % (topic, result, message_id) + Log.error(error_msg) + self.stats.incr_total_sys_exceptions(Exception(error_msg)) + if callback: + callback(result, msg) + def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None): # Just make sure that user supplied values are properly typed topic_name = str(topic_name) @@ -160,7 +173,7 @@ class ContextImpl(pulsar.Context): self.publish_serializers[serde_class_name] = serde_klass() output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message)) - self.publish_producers[topic_name].send_async(output_bytes, callback, properties=properties) + self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties) def ack(self, msgid, topic): if topic not in self.consumers: diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index b018e6d..8f740f1 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -201,7 +201,8 @@ class PythonInstance(object): self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, - self.secrets_provider, self.metrics_labels, self.state_context) + self.secrets_provider, self.metrics_labels, + self.state_context, self.stats) # Now launch a thread that does execution self.execution_thread = threading.Thread(target=self.actual_execution) self.execution_thread.start() @@ -260,9 +261,15 @@ class PythonInstance(object): Log.error("Uncaught exception in Python instance: %s" % e); self.stats.incr_total_sys_exceptions(e) - def done_producing(self, consumer, orig_message, result, sent_message): - if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once: - consumer.acknowledge(orig_message) + def done_producing(self, consumer, orig_message, topic, result, sent_message): + if result == pulsar.Result.Ok: + if self.auto_ack: + consumer.acknowledge(orig_message) + else: + error_msg = "Failed to publish to topic [%s] with error [%s] with src message id [%s]" % (topic, result, orig_message.message_id()) + Log.error(error_msg) + self.stats.incr_total_sys_exceptions(Exception(error_msg)) + def process_result(self, output, msg): if output is not None and self.instance_config.function_details.sink.topic != None and \ @@ -277,7 +284,7 @@ class PythonInstance(object): if output_bytes is not None: props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : base64ify(msg.message.message_id().serialize())} - self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props) + self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message, self.producer.topic()), properties=props) elif self.auto_ack and self.atleast_once: msg.consumer.acknowledge(msg.message) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 53cfeb6..caf5dec 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -18,6 +18,36 @@ */ package org.apache.pulsar.functions.sink; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.instance.SinkRecord; +import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; +import org.apache.pulsar.functions.source.TopicSchema; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.ArgumentMatcher; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; @@ -33,29 +63,6 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; -import java.io.IOException; -import java.util.HashMap; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.instance.SinkRecord; -import org.apache.pulsar.functions.source.TopicSchema; -import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.functions.utils.Utils; -import org.apache.pulsar.io.core.SinkContext; -import org.mockito.ArgumentMatcher; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - @Slf4j public class PulsarSinkTest { @@ -160,7 +167,7 @@ public class PulsarSinkTest { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); try { Schema schema = pulsarSink.initializeSchema(); @@ -178,7 +185,7 @@ public class PulsarSinkTest { // set type to be inconsistent to that of SerDe pulsarConfig.setTypeClassName(Integer.class.getName()); pulsarConfig.setSerdeClassName(TestSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); try { pulsarSink.initializeSchema(); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -200,7 +207,7 @@ public class PulsarSinkTest { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(String.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); try { pulsarSink.initializeSchema(); @@ -219,7 +226,7 @@ public class PulsarSinkTest { // set type to void pulsarConfig.setTypeClassName(String.class.getName()); pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); try { pulsarSink.initializeSchema(); @@ -235,7 +242,7 @@ public class PulsarSinkTest { // set type to void pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); try { pulsarSink.initializeSchema(); @@ -245,8 +252,6 @@ public class PulsarSinkTest { } } - - @Test public void testSinkAndMessageRouting() throws Exception { @@ -259,7 +264,7 @@ public class PulsarSinkTest { /** test At-least-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); + PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -316,7 +321,7 @@ public class PulsarSinkTest { /** test At-most-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -373,7 +378,7 @@ public class PulsarSinkTest { /** test Effectively-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class)); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py index 9c90597..0071e2f 100644 --- a/pulsar-functions/instance/src/test/python/test_python_instance.py +++ b/pulsar-functions/instance/src/test/python/test_python_instance.py @@ -23,6 +23,7 @@ from contextimpl import ContextImpl from python_instance import InstanceConfig from mock import Mock +from pulsar import Message import Function_pb2 import log @@ -31,6 +32,12 @@ import unittest class TestContextImpl(unittest.TestCase): + def Any(cls): + class Any(cls): + def __eq__(self, other): + return True + return Any() + def setUp(self): log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini") @@ -48,11 +55,17 @@ class TestContextImpl(unittest.TestCase): pulsar_client.create_producer = Mock(return_value=producer) user_code=__file__ consumers = None - context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None, None, None) + context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None, None, None, None) - context_impl.publish("test_topic_name", "test_message") + msg = Message() + msg.message_id = Mock(return_value="test_message_id") + context_impl.set_current_message_context(msg, "test_topic_name") - producer.send_async.assert_called_with("test_message", None, properties=None) + context_impl.publish("test_topic_name", "test_message") + args, kwargs = producer.send_async.call_args + self.assertEqual(args[0].decode("utf-8"), "test_message") + self.assertEqual(args[1].args[1], "test_topic_name") + self.assertEqual(args[1].args[2], "test_message_id")