This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 736d82d Enhanced Pulsar Message API as well as functions context to get more details about messages (#3136) 736d82d is described below commit 736d82dcb26d38bb83830f8f737699531bdebae8 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Dec 6 18:37:06 2018 -0800 Enhanced Pulsar Message API as well as functions context to get more details about messages (#3136) --- pulsar-client-cpp/python/pulsar/__init__.py | 25 +++++++++++++++++----- .../python/pulsar/functions/context.py | 15 +++++++++++++ pulsar-client-cpp/python/pulsar_test.py | 1 + .../instance/src/main/python/contextimpl.py | 17 +++++++++++---- .../instance/src/main/python/python_instance.py | 2 +- 5 files changed, 50 insertions(+), 10 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 4a90f57..794c006 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -167,6 +167,12 @@ class Message: """ return self._message.publish_timestamp() + def event_timestamp(self): + """ + Get the timestamp in milliseconds with the message event time. + """ + return self._message.event_timestamp() + def message_id(self): """ The message ID that can be used to refere to this particular message. @@ -672,7 +678,8 @@ class Producer: partition_key=None, sequence_id=None, replication_clusters=None, - disable_replication=False + disable_replication=False, + event_timestamp=None, ): """ Publish a message on the topic. Blocks until the message is acknowledged @@ -698,9 +705,11 @@ class Producer: the message will replicate according to the namespace configuration. * `disable_replication`: Do not replicate this message. + * `event_timestamp`: + Timestamp in millis of the timestamp of event creation """ msg = self._build_msg(content, properties, partition_key, sequence_id, - replication_clusters, disable_replication) + replication_clusters, disable_replication, event_timestamp) return self._producer.send(msg) def send_async(self, content, callback, @@ -708,7 +717,8 @@ class Producer: partition_key=None, sequence_id=None, replication_clusters=None, - disable_replication=False + disable_replication=False, + event_timestamp=None ): """ Send a message asynchronously. @@ -748,9 +758,11 @@ class Producer: configuration. * `disable_replication`: Do not replicate this message. + * `event_timestamp`: + Timestamp in millis of the timestamp of event creation """ msg = self._build_msg(content, properties, partition_key, sequence_id, - replication_clusters, disable_replication) + replication_clusters, disable_replication, event_timestamp) self._producer.send_async(msg, callback) def close(self): @@ -760,13 +772,14 @@ class Producer: self._producer.close() def _build_msg(self, content, properties, partition_key, sequence_id, - replication_clusters, disable_replication): + replication_clusters, disable_replication, event_timestamp): _check_type(bytes, content, 'content') _check_type_or_none(dict, properties, 'properties') _check_type_or_none(str, partition_key, 'partition_key') _check_type_or_none(int, sequence_id, 'sequence_id') _check_type_or_none(list, replication_clusters, 'replication_clusters') _check_type(bool, disable_replication, 'disable_replication') + _check_type_or_none(int, event_timestamp, 'event_timestamp') mb = _pulsar.MessageBuilder() mb.content(content) @@ -781,6 +794,8 @@ class Producer: mb.replication_clusters(replication_clusters) if disable_replication: mb.disable_replication(disable_replication) + if event_timestamp: + mb.event_timestamp(event_timestamp) return mb.build() diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py index 6575f7a..54ba7f5 100644 --- a/pulsar-client-cpp/python/pulsar/functions/context.py +++ b/pulsar-client-cpp/python/pulsar/functions/context.py @@ -50,6 +50,21 @@ class Context(object): pass @abstractmethod + def get_message_key(self): + """Return the key of the current message that we are processing""" + pass + + @abstractmethod + def get_message_eventtime(self): + """Return the event time of the current message that we are processing""" + pass + + @abstractmethod + def get_message_properties(self): + """Return the message properties kv map of the current message that we are processing""" + pass + + @abstractmethod def get_current_message_topic_name(self): """Returns the topic name of the message that we are processing""" pass diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index a78505c..cf95e3e 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -478,6 +478,7 @@ class PulsarTest(TestCase): self._check_value_error(lambda: producer.send(content, sequence_id='test')) self._check_value_error(lambda: producer.send(content, replication_clusters=5)) self._check_value_error(lambda: producer.send(content, disable_replication='test')) + self._check_value_error(lambda: producer.send(content, event_timestamp='test')) client.close() def test_client_argument_errors(self): diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 381d612..a9b9b1a 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -47,7 +47,7 @@ class ContextImpl(pulsar.Context): self.secrets_provider = secrets_provider self.publish_producers = {} self.publish_serializers = {} - self.current_message_id = None + self.message = None self.current_input_topic_name = None self.current_start_time = None self.user_config = json.loads(instance_config.function_details.userConfig) \ @@ -64,13 +64,22 @@ class ContextImpl(pulsar.Context): ContextImpl.user_metrics_label_names) # Called on a per message basis to set the context for the current message - def set_current_message_context(self, msgid, topic): - self.current_message_id = msgid + def set_current_message_context(self, message, topic): + self.message = message self.current_input_topic_name = topic self.current_start_time = time.time() def get_message_id(self): - return self.current_message_id + return self.message.message_id() + + def get_message_key(self): + return self.message.partition_key() + + def get_message_eventtime(self): + return self.message.event_timestamp() + + def get_message_properties(self): + return self.message.properties() def get_current_message_topic_name(self): return self.current_input_topic_name diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 01593fa..815865b 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -191,7 +191,7 @@ class PythonInstance(object): # deserialize message input_object = msg.serde.deserialize(msg.message.data()) # set current message in context - self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic) + self.contextimpl.set_current_message_context(msg.message, msg.topic) output_object = None self.saved_log_handler = None if self.log_topic_handler is not None: