This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new bceca1d Fix python functions (#1663) bceca1d is described below commit bceca1dfd0c697c56dd07d3d983a28091e62f239 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri Apr 27 08:37:32 2018 -0700 Fix python functions (#1663) * Fix Python Functions * Added licence header * Fixed unittest --- .../instance/src/main/python/Function_pb2.py | 146 +++++++++++++++++---- .../src/main/python/python_instance_main.py | 2 +- .../proto/src/main/proto/Function.proto | 1 + .../pulsar/functions/runtime/ProcessRuntime.java | 14 +- .../functions/runtime/ProcessRuntimeTest.java | 5 +- 5 files changed, 135 insertions(+), 33 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index eeb1fab..0f65991 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -41,7 +41,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xac\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12H\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32-.proto.FunctionDetails.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 \x01(\t\x12I\n\x14processingGuarantees\x18 [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xd5\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12H\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32-.proto.FunctionDetails.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 \x01(\t\x12I\n\x14processingGuarantees\x18 [...] ) @@ -67,8 +67,8 @@ _FUNCTIONDETAILS_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=679, - serialized_end=758, + serialized_start=706, + serialized_end=785, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_PROCESSINGGUARANTEES) @@ -86,11 +86,15 @@ _FUNCTIONDETAILS_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( name='EXCLUSIVE', index=1, number=1, options=None, type=None), + _descriptor.EnumValueDescriptor( + name='FAILOVER', index=2, number=2, + options=None, + type=None), ], containing_type=None, options=None, - serialized_start=760, - serialized_end=805, + serialized_start=787, + serialized_end=846, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_SUBSCRIPTIONTYPE) @@ -111,8 +115,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=807, - serialized_end=838, + serialized_start=848, + serialized_end=879, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME) @@ -150,8 +154,8 @@ _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=570, - serialized_end=626, + serialized_start=597, + serialized_end=653, ) _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor( @@ -187,8 +191,8 @@ _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=628, - serialized_end=677, + serialized_start=655, + serialized_end=704, ) _FUNCTIONDETAILS = _descriptor.Descriptor( @@ -304,9 +308,9 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='fqfn', full_name='proto.FunctionDetails.fqfn', index=15, - number=16, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), + name='source', full_name='proto.FunctionDetails.source', index=15, + number=16, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), @@ -326,7 +330,82 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( oneofs=[ ], serialized_start=26, - serialized_end=838, + serialized_end=879, +) + + +_CONNECTORDETAILS_CONFIGSENTRY = _descriptor.Descriptor( + name='ConfigsEntry', + full_name='proto.ConnectorDetails.ConfigsEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='proto.ConnectorDetails.ConfigsEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='value', full_name='proto.ConnectorDetails.ConfigsEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=976, + serialized_end=1022, +) + +_CONNECTORDETAILS = _descriptor.Descriptor( + name='ConnectorDetails', + full_name='proto.ConnectorDetails', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='className', full_name='proto.ConnectorDetails.className', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='configs', full_name='proto.ConnectorDetails.configs', index=1, + number=4, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_CONNECTORDETAILS_CONFIGSENTRY, ], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=882, + serialized_end=1022, ) @@ -356,8 +435,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=840, - serialized_end=886, + serialized_start=1024, + serialized_end=1070, ) @@ -408,8 +487,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=889, - serialized_end=1050, + serialized_start=1073, + serialized_end=1234, ) @@ -446,8 +525,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1052, - serialized_end=1133, + serialized_start=1236, + serialized_end=1317, ) @@ -484,8 +563,8 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1135, - serialized_end=1200, + serialized_start=1319, + serialized_end=1384, ) _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONDETAILS @@ -495,14 +574,18 @@ _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _FUNCTIONDET _FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = _FUNCTIONDETAILS_USERCONFIGENTRY _FUNCTIONDETAILS.fields_by_name['subscriptionType'].enum_type = _FUNCTIONDETAILS_SUBSCRIPTIONTYPE _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME +_FUNCTIONDETAILS.fields_by_name['source'].message_type = _CONNECTORDETAILS _FUNCTIONDETAILS_PROCESSINGGUARANTEES.containing_type = _FUNCTIONDETAILS _FUNCTIONDETAILS_SUBSCRIPTIONTYPE.containing_type = _FUNCTIONDETAILS _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS +_CONNECTORDETAILS_CONFIGSENTRY.containing_type = _CONNECTORDETAILS +_CONNECTORDETAILS.fields_by_name['configs'].message_type = _CONNECTORDETAILS_CONFIGSENTRY _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = _FUNCTIONDETAILS _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS +DESCRIPTOR.message_types_by_name['ConnectorDetails'] = _CONNECTORDETAILS DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE @@ -532,6 +615,21 @@ _sym_db.RegisterMessage(FunctionDetails) _sym_db.RegisterMessage(FunctionDetails.CustomSerdeInputsEntry) _sym_db.RegisterMessage(FunctionDetails.UserConfigEntry) +ConnectorDetails = _reflection.GeneratedProtocolMessageType('ConnectorDetails', (_message.Message,), dict( + + ConfigsEntry = _reflection.GeneratedProtocolMessageType('ConfigsEntry', (_message.Message,), dict( + DESCRIPTOR = _CONNECTORDETAILS_CONFIGSENTRY, + __module__ = 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.ConnectorDetails.ConfigsEntry) + )) + , + DESCRIPTOR = _CONNECTORDETAILS, + __module__ = 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.ConnectorDetails) + )) +_sym_db.RegisterMessage(ConnectorDetails) +_sym_db.RegisterMessage(ConnectorDetails.ConfigsEntry) + PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), dict( DESCRIPTOR = _PACKAGELOCATIONMETADATA, __module__ = 'Function_pb2' @@ -567,4 +665,6 @@ _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.has_options = True _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) _FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True _FUNCTIONDETAILS_USERCONFIGENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) +_CONNECTORDETAILS_CONFIGSENTRY.has_options = True +_CONNECTORDETAILS_CONFIGSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) # @@protoc_insertion_point(module_scope) diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 42b1af3..8a29dc5 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -110,7 +110,7 @@ def main(): if args.output_serde_classname != None and len(args.output_serde_classname) != 0: function_details.outputSerdeClassName = args.output_serde_classname function_details.processingGuarantees = Function_pb2.FunctionDetails.ProcessingGuarantees.Value(args.processing_guarantees) - function_details.subscriptionType = Function_pb2.FunctionDetails.SubscriptionType.Values(args.subscription_type) + function_details.subscriptionType = Function_pb2.FunctionDetails.SubscriptionType.Value(args.subscription_type) if args.auto_ack == "true": function_details.autoAck = True else: diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 1aeb21a..30dd9b5 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -32,6 +32,7 @@ message FunctionDetails { enum SubscriptionType { SHARED = 0; EXCLUSIVE = 1; + FAILOVER = 2; } enum Runtime { JAVA = 0; diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 8da2b9d..6a46097 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -175,12 +175,14 @@ class ProcessRuntime implements Runtime { instancePort = findAvailablePort(); args.add("--port"); args.add(String.valueOf(instancePort)); - args.add("--source_classname"); - args.add(instanceConfig.getFunctionDetails().getSource().getClassName()); - Map<String, String> sourceConfigs = instanceConfig.getFunctionDetails().getSource().getConfigsMap(); - if (sourceConfigs != null && !sourceConfigs.isEmpty()) { - args.add("--source_config"); - args.add(new Gson().toJson(sourceConfigs)); + if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { + args.add("--source_classname"); + args.add(instanceConfig.getFunctionDetails().getSource().getClassName()); + Map<String, String> sourceConfigs = instanceConfig.getFunctionDetails().getSource().getConfigsMap(); + if (sourceConfigs != null && !sourceConfigs.isEmpty()) { + args.add("--source_config"); + args.add(new Gson().toJson(sourceConfigs)); + } } return args; diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 67ac82f..189067a 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -133,7 +133,7 @@ public class ProcessRuntimeTest { ProcessRuntime container = factory.createContainer(config, userJarFile); List<String> args = container.getProcessArgs(); - assertEquals(args.size(), 44); + assertEquals(args.size(), 42); String expectedArgs = "python " + pythonInstanceFile + " --py " + userJarFile + " --logging_directory " + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id " @@ -150,8 +150,7 @@ public class ProcessRuntimeTest { + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port " + args.get(41) - + " --source_classname " + config.getFunctionDetails().getSource().getClassName(); + + " --max_buffered_tuples 1024 --port " + args.get(41); assertEquals(expectedArgs, String.join(" ", args)); } -- To stop receiving notification emails like this one, please contact si...@apache.org.