This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 736d82d  Enhanced Pulsar Message API as well as functions context to 
get more details about messages (#3136)
736d82d is described below

commit 736d82dcb26d38bb83830f8f737699531bdebae8
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Dec 6 18:37:06 2018 -0800

    Enhanced Pulsar Message API as well as functions context to get more 
details about messages (#3136)
---
 pulsar-client-cpp/python/pulsar/__init__.py        | 25 +++++++++++++++++-----
 .../python/pulsar/functions/context.py             | 15 +++++++++++++
 pulsar-client-cpp/python/pulsar_test.py            |  1 +
 .../instance/src/main/python/contextimpl.py        | 17 +++++++++++----
 .../instance/src/main/python/python_instance.py    |  2 +-
 5 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 4a90f57..794c006 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -167,6 +167,12 @@ class Message:
         """
         return self._message.publish_timestamp()
 
+    def event_timestamp(self):
+        """
+        Get the timestamp in milliseconds with the message event time.
+        """
+        return self._message.event_timestamp()
+
     def message_id(self):
         """
         The message ID that can be used to refere to this particular message.
@@ -672,7 +678,8 @@ class Producer:
              partition_key=None,
              sequence_id=None,
              replication_clusters=None,
-             disable_replication=False
+             disable_replication=False,
+             event_timestamp=None,
              ):
         """
         Publish a message on the topic. Blocks until the message is 
acknowledged
@@ -698,9 +705,11 @@ class Producer:
           the message will replicate according to the namespace configuration.
         * `disable_replication`:
           Do not replicate this message.
+        * `event_timestamp`:
+          Timestamp in millis of the timestamp of event creation
         """
         msg = self._build_msg(content, properties, partition_key, sequence_id,
-                              replication_clusters, disable_replication)
+                              replication_clusters, disable_replication, 
event_timestamp)
         return self._producer.send(msg)
 
     def send_async(self, content, callback,
@@ -708,7 +717,8 @@ class Producer:
                    partition_key=None,
                    sequence_id=None,
                    replication_clusters=None,
-                   disable_replication=False
+                   disable_replication=False,
+                   event_timestamp=None
                    ):
         """
         Send a message asynchronously.
@@ -748,9 +758,11 @@ class Producer:
           configuration.
         * `disable_replication`:
           Do not replicate this message.
+        * `event_timestamp`:
+          Timestamp in millis of the timestamp of event creation
         """
         msg = self._build_msg(content, properties, partition_key, sequence_id,
-                              replication_clusters, disable_replication)
+                              replication_clusters, disable_replication, 
event_timestamp)
         self._producer.send_async(msg, callback)
 
     def close(self):
@@ -760,13 +772,14 @@ class Producer:
         self._producer.close()
 
     def _build_msg(self, content, properties, partition_key, sequence_id,
-                   replication_clusters, disable_replication):
+                   replication_clusters, disable_replication, event_timestamp):
         _check_type(bytes, content, 'content')
         _check_type_or_none(dict, properties, 'properties')
         _check_type_or_none(str, partition_key, 'partition_key')
         _check_type_or_none(int, sequence_id, 'sequence_id')
         _check_type_or_none(list, replication_clusters, 'replication_clusters')
         _check_type(bool, disable_replication, 'disable_replication')
+        _check_type_or_none(int, event_timestamp, 'event_timestamp')
 
         mb = _pulsar.MessageBuilder()
         mb.content(content)
@@ -781,6 +794,8 @@ class Producer:
             mb.replication_clusters(replication_clusters)
         if disable_replication:
             mb.disable_replication(disable_replication)
+        if event_timestamp:
+            mb.event_timestamp(event_timestamp)
         return mb.build()
 
 
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py 
b/pulsar-client-cpp/python/pulsar/functions/context.py
index 6575f7a..54ba7f5 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -50,6 +50,21 @@ class Context(object):
     pass
 
   @abstractmethod
+  def get_message_key(self):
+    """Return the key of the current message that we are processing"""
+    pass
+
+  @abstractmethod
+  def get_message_eventtime(self):
+    """Return the event time of the current message that we are processing"""
+    pass
+
+  @abstractmethod
+  def get_message_properties(self):
+    """Return the message properties kv map of the current message that we are 
processing"""
+    pass
+
+  @abstractmethod
   def get_current_message_topic_name(self):
     """Returns the topic name of the message that we are processing"""
     pass
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index a78505c..cf95e3e 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -478,6 +478,7 @@ class PulsarTest(TestCase):
         self._check_value_error(lambda: producer.send(content, 
sequence_id='test'))
         self._check_value_error(lambda: producer.send(content, 
replication_clusters=5))
         self._check_value_error(lambda: producer.send(content, 
disable_replication='test'))
+        self._check_value_error(lambda: producer.send(content, 
event_timestamp='test'))
         client.close()
 
     def test_client_argument_errors(self):
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 381d612..a9b9b1a 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -47,7 +47,7 @@ class ContextImpl(pulsar.Context):
     self.secrets_provider = secrets_provider
     self.publish_producers = {}
     self.publish_serializers = {}
-    self.current_message_id = None
+    self.message = None
     self.current_input_topic_name = None
     self.current_start_time = None
     self.user_config = json.loads(instance_config.function_details.userConfig) 
\
@@ -64,13 +64,22 @@ class ContextImpl(pulsar.Context):
                                         ContextImpl.user_metrics_label_names)
 
   # Called on a per message basis to set the context for the current message
-  def set_current_message_context(self, msgid, topic):
-    self.current_message_id = msgid
+  def set_current_message_context(self, message, topic):
+    self.message = message
     self.current_input_topic_name = topic
     self.current_start_time = time.time()
 
   def get_message_id(self):
-    return self.current_message_id
+    return self.message.message_id()
+
+  def get_message_key(self):
+    return self.message.partition_key()
+
+  def get_message_eventtime(self):
+    return self.message.event_timestamp()
+
+  def get_message_properties(self):
+    return self.message.properties()
 
   def get_current_message_topic_name(self):
     return self.current_input_topic_name
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 01593fa..815865b 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -191,7 +191,7 @@ class PythonInstance(object):
         # deserialize message
         input_object = msg.serde.deserialize(msg.message.data())
         # set current message in context
-        self.contextimpl.set_current_message_context(msg.message.message_id(), 
msg.topic)
+        self.contextimpl.set_current_message_context(msg.message, msg.topic)
         output_object = None
         self.saved_log_handler = None
         if self.log_topic_handler is not None:

Reply via email to