This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bceca1d  Fix python functions (#1663)
bceca1d is described below

commit bceca1dfd0c697c56dd07d3d983a28091e62f239
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Fri Apr 27 08:37:32 2018 -0700

    Fix python functions (#1663)
    
    * Fix Python Functions
    
    * Added licence header
    
    * Fixed unittest
---
 .../instance/src/main/python/Function_pb2.py       | 146 +++++++++++++++++----
 .../src/main/python/python_instance_main.py        |   2 +-
 .../proto/src/main/proto/Function.proto            |   1 +
 .../pulsar/functions/runtime/ProcessRuntime.java   |  14 +-
 .../functions/runtime/ProcessRuntimeTest.java      |   5 +-
 5 files changed, 135 insertions(+), 33 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index eeb1fab..0f65991 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -41,7 +41,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xac\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12H\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32-.proto.FunctionDetails.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 
\x01(\t\x12I\n\x14processingGuarantees\x18 [...]
+  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xd5\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12H\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32-.proto.FunctionDetails.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 
\x01(\t\x12I\n\x14processingGuarantees\x18 [...]
 )
 
 
@@ -67,8 +67,8 @@ _FUNCTIONDETAILS_PROCESSINGGUARANTEES = 
_descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=679,
-  serialized_end=758,
+  serialized_start=706,
+  serialized_end=785,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_PROCESSINGGUARANTEES)
 
@@ -86,11 +86,15 @@ _FUNCTIONDETAILS_SUBSCRIPTIONTYPE = 
_descriptor.EnumDescriptor(
       name='EXCLUSIVE', index=1, number=1,
       options=None,
       type=None),
+    _descriptor.EnumValueDescriptor(
+      name='FAILOVER', index=2, number=2,
+      options=None,
+      type=None),
   ],
   containing_type=None,
   options=None,
-  serialized_start=760,
-  serialized_end=805,
+  serialized_start=787,
+  serialized_end=846,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_SUBSCRIPTIONTYPE)
 
@@ -111,8 +115,8 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=807,
-  serialized_end=838,
+  serialized_start=848,
+  serialized_end=879,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -150,8 +154,8 @@ _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=570,
-  serialized_end=626,
+  serialized_start=597,
+  serialized_end=653,
 )
 
 _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
@@ -187,8 +191,8 @@ _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=628,
-  serialized_end=677,
+  serialized_start=655,
+  serialized_end=704,
 )
 
 _FUNCTIONDETAILS = _descriptor.Descriptor(
@@ -304,9 +308,9 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='fqfn', full_name='proto.FunctionDetails.fqfn', index=15,
-      number=16, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      name='source', full_name='proto.FunctionDetails.source', index=15,
+      number=16, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
@@ -326,7 +330,82 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=26,
-  serialized_end=838,
+  serialized_end=879,
+)
+
+
+_CONNECTORDETAILS_CONFIGSENTRY = _descriptor.Descriptor(
+  name='ConfigsEntry',
+  full_name='proto.ConnectorDetails.ConfigsEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='proto.ConnectorDetails.ConfigsEntry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='proto.ConnectorDetails.ConfigsEntry.value', 
index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), 
_b('8\001')),
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=976,
+  serialized_end=1022,
+)
+
+_CONNECTORDETAILS = _descriptor.Descriptor(
+  name='ConnectorDetails',
+  full_name='proto.ConnectorDetails',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='className', full_name='proto.ConnectorDetails.className', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='configs', full_name='proto.ConnectorDetails.configs', index=1,
+      number=4, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[_CONNECTORDETAILS_CONFIGSENTRY, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=882,
+  serialized_end=1022,
 )
 
 
@@ -356,8 +435,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=840,
-  serialized_end=886,
+  serialized_start=1024,
+  serialized_end=1070,
 )
 
 
@@ -408,8 +487,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=889,
-  serialized_end=1050,
+  serialized_start=1073,
+  serialized_end=1234,
 )
 
 
@@ -446,8 +525,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1052,
-  serialized_end=1133,
+  serialized_start=1236,
+  serialized_end=1317,
 )
 
 
@@ -484,8 +563,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1135,
-  serialized_end=1200,
+  serialized_start=1319,
+  serialized_end=1384,
 )
 
 _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONDETAILS
@@ -495,14 +574,18 @@ 
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _FUNCTIONDET
 _FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = 
_FUNCTIONDETAILS_USERCONFIGENTRY
 _FUNCTIONDETAILS.fields_by_name['subscriptionType'].enum_type = 
_FUNCTIONDETAILS_SUBSCRIPTIONTYPE
 _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME
+_FUNCTIONDETAILS.fields_by_name['source'].message_type = _CONNECTORDETAILS
 _FUNCTIONDETAILS_PROCESSINGGUARANTEES.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS_SUBSCRIPTIONTYPE.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
+_CONNECTORDETAILS_CONFIGSENTRY.containing_type = _CONNECTORDETAILS
+_CONNECTORDETAILS.fields_by_name['configs'].message_type = 
_CONNECTORDETAILS_CONFIGSENTRY
 _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = 
_FUNCTIONDETAILS
 _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = 
_PACKAGELOCATIONMETADATA
 _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
 _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
 DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
+DESCRIPTOR.message_types_by_name['ConnectorDetails'] = _CONNECTORDETAILS
 DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = 
_PACKAGELOCATIONMETADATA
 DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA
 DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
@@ -532,6 +615,21 @@ _sym_db.RegisterMessage(FunctionDetails)
 _sym_db.RegisterMessage(FunctionDetails.CustomSerdeInputsEntry)
 _sym_db.RegisterMessage(FunctionDetails.UserConfigEntry)
 
+ConnectorDetails = 
_reflection.GeneratedProtocolMessageType('ConnectorDetails', 
(_message.Message,), dict(
+
+  ConfigsEntry = _reflection.GeneratedProtocolMessageType('ConfigsEntry', 
(_message.Message,), dict(
+    DESCRIPTOR = _CONNECTORDETAILS_CONFIGSENTRY,
+    __module__ = 'Function_pb2'
+    # @@protoc_insertion_point(class_scope:proto.ConnectorDetails.ConfigsEntry)
+    ))
+  ,
+  DESCRIPTOR = _CONNECTORDETAILS,
+  __module__ = 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.ConnectorDetails)
+  ))
+_sym_db.RegisterMessage(ConnectorDetails)
+_sym_db.RegisterMessage(ConnectorDetails.ConfigsEntry)
+
 PackageLocationMetaData = 
_reflection.GeneratedProtocolMessageType('PackageLocationMetaData', 
(_message.Message,), dict(
   DESCRIPTOR = _PACKAGELOCATIONMETADATA,
   __module__ = 'Function_pb2'
@@ -567,4 +665,6 @@ _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.has_options = True
 _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 _FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True
 _FUNCTIONDETAILS_USERCONFIGENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+_CONNECTORDETAILS_CONFIGSENTRY.has_options = True
+_CONNECTORDETAILS_CONFIGSENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 42b1af3..8a29dc5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -110,7 +110,7 @@ def main():
   if args.output_serde_classname != None and len(args.output_serde_classname) 
!= 0:
     function_details.outputSerdeClassName = args.output_serde_classname
   function_details.processingGuarantees = 
Function_pb2.FunctionDetails.ProcessingGuarantees.Value(args.processing_guarantees)
-  function_details.subscriptionType = 
Function_pb2.FunctionDetails.SubscriptionType.Values(args.subscription_type)
+  function_details.subscriptionType = 
Function_pb2.FunctionDetails.SubscriptionType.Value(args.subscription_type)
   if args.auto_ack == "true":
     function_details.autoAck = True
   else:
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 1aeb21a..30dd9b5 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -32,6 +32,7 @@ message FunctionDetails {
     enum SubscriptionType {
         SHARED = 0;
         EXCLUSIVE = 1;
+        FAILOVER = 2;
     }
     enum Runtime {
         JAVA = 0;
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 8da2b9d..6a46097 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -175,12 +175,14 @@ class ProcessRuntime implements Runtime {
         instancePort = findAvailablePort();
         args.add("--port");
         args.add(String.valueOf(instancePort));
-        args.add("--source_classname");
-        
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
-        Map<String, String> sourceConfigs = 
instanceConfig.getFunctionDetails().getSource().getConfigsMap();
-        if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
-            args.add("--source_config");
-            args.add(new Gson().toJson(sourceConfigs));
+        if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
+            args.add("--source_classname");
+            
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
+            Map<String, String> sourceConfigs = 
instanceConfig.getFunctionDetails().getSource().getConfigsMap();
+            if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+                args.add("--source_config");
+                args.add(new Gson().toJson(sourceConfigs));
+            }
         }
 
         return args;
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 67ac82f..189067a 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -133,7 +133,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 44);
+        assertEquals(args.size(), 42);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + 
config.getFunctionDetails().getName() + " --instance_id "
@@ -150,8 +150,7 @@ public class ProcessRuntimeTest {
                 + " --output_serde_classname " + 
config.getFunctionDetails().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(41)
-                + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName();
+                + " --max_buffered_tuples 1024 --port " + args.get(41);
         assertEquals(expectedArgs, String.join(" ", args));
     }
 

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to