This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fdaa9e3  Fix topic name logic for partitioned topics (#3693)
fdaa9e3 is described below

commit fdaa9e3728e463bc67f5e946833d1dac392412e2
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Mar 15 15:05:53 2019 -0700

    Fix topic name logic for partitioned topics (#3693)
    
    * Since partitioned topics have a -partition-<partitionid> affixed to the 
topic name,
    when doing explicit acking, check for the case to determine the right topic 
name
    
    * added unittests
---
 .../instance/src/main/python/contextimpl.py        | 21 +++++++++++++++------
 .../src/scripts/run_python_instance_tests.sh       |  3 ++-
 .../src/test/python/test_python_instance.py        | 22 +++++++++++++++++++++-
 3 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 638e64f..f3a9710 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -23,6 +23,7 @@
 """contextimpl.py: ContextImpl class that implements the Context interface
 """
 
+import re
 import time
 import os
 import json
@@ -54,7 +55,6 @@ class ContextImpl(pulsar.Context):
     self.publish_producers = {}
     self.publish_serializers = {}
     self.message = None
-    self.current_input_topic_name = None
     self.current_start_time = None
     self.user_config = json.loads(instance_config.function_details.userConfig) 
\
       if instance_config.function_details.userConfig \
@@ -73,7 +73,6 @@ class ContextImpl(pulsar.Context):
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, message, topic):
     self.message = message
-    self.current_input_topic_name = topic
     self.current_start_time = time.time()
 
   def get_message_id(self):
@@ -89,7 +88,7 @@ class ContextImpl(pulsar.Context):
     return self.message.properties()
 
   def get_current_message_topic_name(self):
-    return self.current_input_topic_name
+    return self.message.topic_name()
 
   def get_function_name(self):
     return self.instance_config.function_details.name
@@ -176,9 +175,19 @@ class ContextImpl(pulsar.Context):
     self.publish_producers[topic_name].send_async(output_bytes, 
partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), 
properties=properties)
 
   def ack(self, msgid, topic):
-    if topic not in self.consumers:
-      raise ValueError('Invalid topicname %s' % topic)
-    self.consumers[topic].acknowledge(msgid)
+    topic_consumer = None
+    if topic in self.consumers:
+      topic_consumer = self.consumers[topic]
+    else:
+      # if this topic is a partitioned topic
+      m = re.search('(.+)-partition-(\d+)', topic)
+      if not m:
+        raise ValueError('Invalid topicname %s' % topic)
+      elif m.group(1) in self.consumers:
+        topic_consumer = self.consumers[m.group(1)]
+      else:
+        raise ValueError('Invalid topicname %s' % topic)
+    topic_consumer.acknowledge(msgid)
 
   def get_and_reset_metrics(self):
     metrics = self.get_metrics()
diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh 
b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
index 9b33c24..7005b9b 100644
--- a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
+++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
@@ -22,9 +22,10 @@
 # Make sure dependencies are installed
 pip install mock --user
 pip install protobuf --user
+pip install fastavro --user
 
 CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
 PULSAR_HOME=$CUR_DIR/../../../../
 
 # run instance tests
-PULSAR_HOME=${PULSAR_HOME} 
PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance 
python -m unittest discover -v 
${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests
\ No newline at end of file
+PULSAR_HOME=${PULSAR_HOME} 
PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance 
python -m unittest discover -v 
${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py 
b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 0071e2f..8b92fa8 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -20,9 +20,12 @@
 
 # DEPENDENCIES:  unittest2,mock
 
+from mock import Mock
+import sys
+sys.modules['prometheus_client'] = Mock()
+
 from contextimpl import ContextImpl
 from python_instance import InstanceConfig
-from mock import Mock
 from pulsar import Message
 
 import Function_pb2
@@ -68,4 +71,21 @@ class TestContextImpl(unittest.TestCase):
     self.assertEqual(args[1].args[1], "test_topic_name")
     self.assertEqual(args[1].args[2], "test_message_id")
 
+  def test_context_ack_partitionedtopic(self):
+    instance_id = 'test_instance_id'
+    function_id = 'test_function_id'
+    function_version = 'test_function_version'
+    function_details = Function_pb2.FunctionDetails()
+    max_buffered_tuples = 100;
+    instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
+    logger = log.Log
+    pulsar_client = Mock()
+    user_code=__file__
+    consumer = Mock()
+    consumer.acknowledge = Mock(return_value=None)
+    consumers = {"mytopic" : consumer}
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, 
user_code, consumers, None, None, None, None)
+    context_impl.ack("test_message_id", "mytopic-partition-3")
 
+    args, kwargs = consumer.acknowledge.call_args
+    self.assertEqual(args[0], "test_message_id")
\ No newline at end of file

Reply via email to