This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fdaa9e3 Fix topic name logic for partitioned topics (#3693) fdaa9e3 is described below commit fdaa9e3728e463bc67f5e946833d1dac392412e2 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Mar 15 15:05:53 2019 -0700 Fix topic name logic for partitioned topics (#3693) * Since partitioned topics have a -partition-<partitionid> affixed to the topic name, when doing explicit acking, check for the case to determine the right topic name * added unittests --- .../instance/src/main/python/contextimpl.py | 21 +++++++++++++++------ .../src/scripts/run_python_instance_tests.sh | 3 ++- .../src/test/python/test_python_instance.py | 22 +++++++++++++++++++++- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 638e64f..f3a9710 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -23,6 +23,7 @@ """contextimpl.py: ContextImpl class that implements the Context interface """ +import re import time import os import json @@ -54,7 +55,6 @@ class ContextImpl(pulsar.Context): self.publish_producers = {} self.publish_serializers = {} self.message = None - self.current_input_topic_name = None self.current_start_time = None self.user_config = json.loads(instance_config.function_details.userConfig) \ if instance_config.function_details.userConfig \ @@ -73,7 +73,6 @@ class ContextImpl(pulsar.Context): # Called on a per message basis to set the context for the current message def set_current_message_context(self, message, topic): self.message = message - self.current_input_topic_name = topic self.current_start_time = time.time() def get_message_id(self): @@ -89,7 +88,7 @@ class ContextImpl(pulsar.Context): return self.message.properties() def get_current_message_topic_name(self): - return self.current_input_topic_name + return self.message.topic_name() def get_function_name(self): return self.instance_config.function_details.name @@ -176,9 +175,19 @@ class ContextImpl(pulsar.Context): self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties) def ack(self, msgid, topic): - if topic not in self.consumers: - raise ValueError('Invalid topicname %s' % topic) - self.consumers[topic].acknowledge(msgid) + topic_consumer = None + if topic in self.consumers: + topic_consumer = self.consumers[topic] + else: + # if this topic is a partitioned topic + m = re.search('(.+)-partition-(\d+)', topic) + if not m: + raise ValueError('Invalid topicname %s' % topic) + elif m.group(1) in self.consumers: + topic_consumer = self.consumers[m.group(1)] + else: + raise ValueError('Invalid topicname %s' % topic) + topic_consumer.acknowledge(msgid) def get_and_reset_metrics(self): metrics = self.get_metrics() diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh index 9b33c24..7005b9b 100644 --- a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh +++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh @@ -22,9 +22,10 @@ # Make sure dependencies are installed pip install mock --user pip install protobuf --user +pip install fastavro --user CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" PULSAR_HOME=$CUR_DIR/../../../../ # run instance tests -PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests \ No newline at end of file +PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py index 0071e2f..8b92fa8 100644 --- a/pulsar-functions/instance/src/test/python/test_python_instance.py +++ b/pulsar-functions/instance/src/test/python/test_python_instance.py @@ -20,9 +20,12 @@ # DEPENDENCIES: unittest2,mock +from mock import Mock +import sys +sys.modules['prometheus_client'] = Mock() + from contextimpl import ContextImpl from python_instance import InstanceConfig -from mock import Mock from pulsar import Message import Function_pb2 @@ -68,4 +71,21 @@ class TestContextImpl(unittest.TestCase): self.assertEqual(args[1].args[1], "test_topic_name") self.assertEqual(args[1].args[2], "test_message_id") + def test_context_ack_partitionedtopic(self): + instance_id = 'test_instance_id' + function_id = 'test_function_id' + function_version = 'test_function_version' + function_details = Function_pb2.FunctionDetails() + max_buffered_tuples = 100; + instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples) + logger = log.Log + pulsar_client = Mock() + user_code=__file__ + consumer = Mock() + consumer.acknowledge = Mock(return_value=None) + consumers = {"mytopic" : consumer} + context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None, None, None, None) + context_impl.ack("test_message_id", "mytopic-partition-3") + args, kwargs = consumer.acknowledge.call_args + self.assertEqual(args[0], "test_message_id") \ No newline at end of file