This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 22d7a6c  Added ability to specify producer config for functions and 
sources (#7721)
22d7a6c is described below

commit 22d7a6cbcc36c79be64d3f39f707139ab43889c6
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Aug 4 18:34:58 2020 -0700

    Added ability to specify producer config for functions and sources (#7721)
    
    * Added ability to specify producer config for functions and sources
    
    * Fixed test
    
    * Fix test
    
    * Add generated function proto
    
    * Add header
    
    * Address comments
    
    Co-authored-by: Sanjeev Kulkarni <sanje...@splunk.com>
---
 .../pulsar/common/functions/FunctionConfig.java    |   3 +
 .../pulsar/common/functions/ProducerConfig.java    |  35 +-
 .../org/apache/pulsar/common/io/SourceConfig.java  |   3 +
 .../pulsar/functions/instance/ContextImpl.java     |   8 +
 .../functions/instance/JavaInstanceRunnable.java   |   4 +
 .../apache/pulsar/functions/sink/PulsarSink.java   |   8 +
 .../pulsar/functions/sink/PulsarSinkConfig.java    |   2 +
 .../instance/src/main/python/Function_pb2.py       | 610 ++++++++++++++++-----
 .../proto/src/main/proto/Function.proto            |   6 +
 .../functions/utils/FunctionConfigUtils.java       |  25 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  22 +
 .../functions/utils/FunctionConfigUtilsTest.java   |  13 +-
 .../functions/utils/SourceConfigUtilsTest.java     |   6 +
 13 files changed, 589 insertions(+), 156 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 8c680e8..fa925c4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -77,6 +77,9 @@ public class FunctionConfig {
 
     private String output;
 
+    // Any configuration that need to be applied for producers
+    private ProducerConfig producerConfig;
+
     /**
      * Represents either a builtin schema type (eg: 'avro', 'json', ect) or 
the class name for a Schema
      * implementation.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
similarity index 59%
copy from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index 4e47812..b28370e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -16,24 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.sink;
+package org.apache.pulsar.common.functions;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.common.functions.FunctionConfig;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
 
-import java.util.Map;
-
-@Getter
-@Setter
-@ToString
-public class PulsarSinkConfig {
-    private FunctionConfig.ProcessingGuarantees processingGuarantees;
-    private String topic;
-    private String serdeClassName;
-    private String schemaType;
-    private Map<String, String> schemaProperties;
-    private String typeClassName;
-    private boolean forwardSourceMessageProperty;
+/**
+ * Configuration of the producer inside the function.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ProducerConfig {
+    private Integer maxPendingMessages;
+    private Integer maxPendingMessagesAcrossPartitions;
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index b3a5634..31a8634 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -24,6 +24,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
 
 /**
@@ -41,6 +42,8 @@ public class SourceConfig {
 
     private String topicName;
 
+    private ProducerConfig producerConfig;
+
     private String serdeClassName;
 
     private String schemaType;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index c61aee0..763d9f7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -108,6 +108,14 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
         this.producerBuilder = (ProducerBuilderImpl<?>) 
client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+        if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
+            if 
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()
 != 0) {
+                
this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+            }
+            if 
(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
 != 0) {
+                
this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+            }
+        }
 
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
             userConfigs = new HashMap<>();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 29cb71f..45a8174 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -812,6 +812,10 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
                 
pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());
 
+                if 
(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
+                    
pulsarSinkConfig.setProducerSpec(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec());
+                }
+
                 object = new PulsarSink(this.client, pulsarSinkConfig, 
this.properties, this.stats, this.functionClassLoader);
             }
         } else {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 9aa8bc1..00d12c1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -103,6 +103,14 @@ public class PulsarSink<T> implements Sink<T> {
             if (producerName != null) {
                 builder.producerName(producerName);
             }
+            if (pulsarSinkConfig.getProducerSpec() != null) {
+                if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessages() 
!= 0) {
+                    
builder.maxPendingMessages(pulsarSinkConfig.getProducerSpec().getMaxPendingMessages());
+                }
+                if 
(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 
0) {
+                    
builder.maxPendingMessagesAcrossPartitions(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+                }
+            }
 
             return builder.properties(properties).create();
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
index 4e47812..a4ba7e3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
@@ -22,6 +22,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.proto.Function;
 
 import java.util.Map;
 
@@ -36,4 +37,5 @@ public class PulsarSinkConfig {
     private Map<String, String> schemaProperties;
     private String typeClassName;
     private boolean forwardSourceMessageProperty;
+    private Function.ProducerSpec producerSpec;
 }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 108ed79..203809d 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -17,7 +17,6 @@
 # under the License.
 #
 
-# -*- coding: utf-8 -*-
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
@@ -38,7 +37,8 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   package='proto',
   syntax='proto3',
   serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
-  
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xa3\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+  create_key=_descriptor._internal_create_key,
+  
serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01
 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 
\x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 
\x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 
\x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 
\x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -46,24 +46,28 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   full_name='proto.ProcessingGuarantees',
   filename=None,
   file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
   values=[
     _descriptor.EnumValueDescriptor(
       name='ATLEAST_ONCE', index=0, number=0,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='ATMOST_ONCE', index=1, number=1,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='EFFECTIVELY_ONCE', index=2, number=2,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2429,
-  serialized_end=2508,
+  serialized_start=3174,
+  serialized_end=3253,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -73,20 +77,28 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   full_name='proto.SubscriptionType',
   filename=None,
   file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
   values=[
     _descriptor.EnumValueDescriptor(
       name='SHARED', index=0, number=0,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='FAILOVER', index=1, number=1,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
+    _descriptor.EnumValueDescriptor(
+      name='KEY_SHARED', index=2, number=2,
+      serialized_options=None,
+      type=None,
+      create_key=_descriptor._internal_create_key),
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2510,
-  serialized_end=2554,
+  serialized_start=3255,
+  serialized_end=3315,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -96,20 +108,23 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
   full_name='proto.SubscriptionPosition',
   filename=None,
   file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
   values=[
     _descriptor.EnumValueDescriptor(
       name='LATEST', index=0, number=0,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='EARLIEST', index=1, number=1,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2556,
-  serialized_end=2604,
+  serialized_start=3317,
+  serialized_end=3365,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
 
@@ -119,20 +134,23 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   full_name='proto.FunctionState',
   filename=None,
   file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
   values=[
     _descriptor.EnumValueDescriptor(
       name='RUNNING', index=0, number=0,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='STOPPED', index=1, number=1,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=2606,
-  serialized_end=2647,
+  serialized_start=3367,
+  serialized_end=3408,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
 
@@ -142,6 +160,7 @@ ATMOST_ONCE = 1
 EFFECTIVELY_ONCE = 2
 SHARED = 0
 FAILOVER = 1
+KEY_SHARED = 2
 LATEST = 0
 EARLIEST = 1
 RUNNING = 0
@@ -153,24 +172,28 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   full_name='proto.FunctionDetails.Runtime',
   filename=None,
   file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
   values=[
     _descriptor.EnumValueDescriptor(
       name='JAVA', index=0, number=0,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='PYTHON', index=1, number=1,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='GO', index=2, number=3,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=717,
-  serialized_end=756,
+  serialized_start=785,
+  serialized_end=824,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
@@ -179,28 +202,33 @@ _FUNCTIONDETAILS_COMPONENTTYPE = 
_descriptor.EnumDescriptor(
   full_name='proto.FunctionDetails.ComponentType',
   filename=None,
   file=DESCRIPTOR,
+  create_key=_descriptor._internal_create_key,
   values=[
     _descriptor.EnumValueDescriptor(
       name='UNKNOWN', index=0, number=0,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='FUNCTION', index=1, number=1,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='SOURCE', index=2, number=2,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
     _descriptor.EnumValueDescriptor(
       name='SINK', index=3, number=3,
       serialized_options=None,
-      type=None),
+      type=None,
+      create_key=_descriptor._internal_create_key),
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=758,
-  serialized_end=822,
+  serialized_start=826,
+  serialized_end=890,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
 
@@ -211,6 +239,7 @@ _RESOURCES = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='cpu', full_name='proto.Resources.cpu', index=0,
@@ -218,21 +247,21 @@ _RESOURCES = _descriptor.Descriptor(
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='ram', full_name='proto.Resources.ram', index=1,
       number=2, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='disk', full_name='proto.Resources.disk', index=2,
       number=3, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -256,6 +285,7 @@ _RETRYDETAILS = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='maxMessageRetries', 
full_name='proto.RetryDetails.maxMessageRetries', index=0,
@@ -263,14 +293,14 @@ _RETRYDETAILS = _descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic', 
index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -294,6 +324,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='tenant', full_name='proto.FunctionDetails.tenant', index=0,
@@ -301,133 +332,154 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='namespace', full_name='proto.FunctionDetails.namespace', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='name', full_name='proto.FunctionDetails.name', index=2,
       number=3, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.FunctionDetails.className', index=3,
       number=4, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='processingGuarantees', 
full_name='proto.FunctionDetails.processingGuarantees', index=5,
       number=6, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7,
       number=16, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='runtime', full_name='proto.FunctionDetails.runtime', index=8,
       number=8, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9,
       number=9, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='parallelism', full_name='proto.FunctionDetails.parallelism', 
index=10,
       number=10, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='source', full_name='proto.FunctionDetails.source', index=11,
       number=11, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='sink', full_name='proto.FunctionDetails.sink', index=12,
       number=12, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='resources', full_name='proto.FunctionDetails.resources', index=13,
       number=13, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='packageUrl', full_name='proto.FunctionDetails.packageUrl', 
index=14,
       number=14, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='retryDetails', full_name='proto.FunctionDetails.retryDetails', 
index=15,
       number=15, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='runtimeFlags', full_name='proto.FunctionDetails.runtimeFlags', 
index=16,
       number=17, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='componentType', full_name='proto.FunctionDetails.componentType', 
index=17,
       number=18, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='customRuntimeOptions', 
full_name='proto.FunctionDetails.customRuntimeOptions', index=18,
       number=19, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='builtin', full_name='proto.FunctionDetails.builtin', index=19,
+      number=20, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='retainOrdering', full_name='proto.FunctionDetails.retainOrdering', 
index=20,
+      number=21, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='retainKeyOrdering', 
full_name='proto.FunctionDetails.retainKeyOrdering', index=21,
+      number=22, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -443,7 +495,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=147,
-  serialized_end=822,
+  serialized_end=890,
 )
 
 
@@ -453,6 +505,7 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.ConsumerSpec.ReceiverQueueSize.value', 
index=0,
@@ -460,7 +513,7 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -473,8 +526,84 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=975,
-  serialized_end=1009,
+  serialized_start=1185,
+  serialized_end=1219,
+)
+
+_CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
+  name='SchemaPropertiesEntry',
+  full_name='proto.ConsumerSpec.SchemaPropertiesEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='proto.ConsumerSpec.SchemaPropertiesEntry.key', 
index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='value', 
full_name='proto.ConsumerSpec.SchemaPropertiesEntry.value', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=b'8\001',
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1221,
+  serialized_end=1276,
+)
+
+_CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
+  name='ConsumerPropertiesEntry',
+  full_name='proto.ConsumerSpec.ConsumerPropertiesEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='proto.ConsumerSpec.ConsumerPropertiesEntry.key', 
index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='value', 
full_name='proto.ConsumerSpec.ConsumerPropertiesEntry.value', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=b'8\001',
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1278,
+  serialized_end=1335,
 )
 
 _CONSUMERSPEC = _descriptor.Descriptor(
@@ -483,6 +612,7 @@ _CONSUMERSPEC = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='schemaType', full_name='proto.ConsumerSpec.schemaType', index=0,
@@ -490,32 +620,85 @@ _CONSUMERSPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='serdeClassName', full_name='proto.ConsumerSpec.serdeClassName', 
index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='isRegexPattern', full_name='proto.ConsumerSpec.isRegexPattern', 
index=2,
       number=3, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='receiverQueueSize', 
full_name='proto.ConsumerSpec.receiverQueueSize', index=3,
       number=4, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='schemaProperties', 
full_name='proto.ConsumerSpec.schemaProperties', index=4,
+      number=5, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='consumerProperties', 
full_name='proto.ConsumerSpec.consumerProperties', index=5,
+      number=6, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, 
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY, _CONSUMERSPEC_CONSUMERPROPERTIESENTRY, ],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=893,
+  serialized_end=1335,
+)
+
+
+_PRODUCERSPEC = _descriptor.Descriptor(
+  name='ProducerSpec',
+  full_name='proto.ProducerSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='maxPendingMessages', 
full_name='proto.ProducerSpec.maxPendingMessages', index=0,
+      number=1, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='maxPendingMessagesAcrossPartitions', 
full_name='proto.ProducerSpec.maxPendingMessagesAcrossPartitions', index=1,
+      number=2, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
-  nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, ],
+  nested_types=[],
   enum_types=[
   ],
   serialized_options=None,
@@ -524,8 +707,8 @@ _CONSUMERSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=825,
-  serialized_end=1009,
+  serialized_start=1337,
+  serialized_end=1423,
 )
 
 
@@ -535,6 +718,7 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='key', 
full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.key', index=0,
@@ -542,14 +726,14 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='value', 
full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -562,8 +746,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1451,
-  serialized_end=1512,
+  serialized_start=1903,
+  serialized_end=1964,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -572,6 +756,7 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='key', full_name='proto.SourceSpec.InputSpecsEntry.key', index=0,
@@ -579,14 +764,14 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.SourceSpec.InputSpecsEntry.value', 
index=1,
       number=2, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -599,8 +784,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1514,
-  serialized_end=1584,
+  serialized_start=1966,
+  serialized_end=2036,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -609,6 +794,7 @@ _SOURCESPEC = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.SourceSpec.className', index=0,
@@ -616,84 +802,91 @@ _SOURCESPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='configs', full_name='proto.SourceSpec.configs', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='typeClassName', full_name='proto.SourceSpec.typeClassName', 
index=2,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', 
index=3,
       number=3, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='topicsToSerDeClassName', 
full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
       number=4, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=b'\030\001', file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='inputSpecs', full_name='proto.SourceSpec.inputSpecs', index=5,
       number=10, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=6,
       number=6, type=4, cpp_type=4, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='topicsPattern', full_name='proto.SourceSpec.topicsPattern', 
index=7,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=b'\030\001', file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='builtin', full_name='proto.SourceSpec.builtin', index=8,
       number=8, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='subscriptionName', full_name='proto.SourceSpec.subscriptionName', 
index=9,
       number=9, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='cleanupSubscription', 
full_name='proto.SourceSpec.cleanupSubscription', index=10,
       number=11, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='subscriptionPosition', 
full_name='proto.SourceSpec.subscriptionPosition', index=11,
       number=12, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='negativeAckRedeliveryDelayMs', 
full_name='proto.SourceSpec.negativeAckRedeliveryDelayMs', index=12,
+      number=13, type=4, cpp_type=4, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -706,17 +899,94 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1012,
-  serialized_end=1584,
+  serialized_start=1426,
+  serialized_end=2036,
 )
 
 
+_SINKSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor(
+  name='SchemaPropertiesEntry',
+  full_name='proto.SinkSpec.SchemaPropertiesEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='proto.SinkSpec.SchemaPropertiesEntry.key', 
index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='proto.SinkSpec.SchemaPropertiesEntry.value', 
index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=b'8\001',
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1221,
+  serialized_end=1276,
+)
+
+_SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor(
+  name='ConsumerPropertiesEntry',
+  full_name='proto.SinkSpec.ConsumerPropertiesEntry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  create_key=_descriptor._internal_create_key,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', full_name='proto.SinkSpec.ConsumerPropertiesEntry.key', 
index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='value', full_name='proto.SinkSpec.ConsumerPropertiesEntry.value', 
index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=b'8\001',
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1278,
+  serialized_end=1335,
+)
+
 _SINKSPEC = _descriptor.Descriptor(
   name='SinkSpec',
   full_name='proto.SinkSpec',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.SinkSpec.className', index=0,
@@ -724,60 +994,81 @@ _SINKSPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='configs', full_name='proto.SinkSpec.configs', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='topic', full_name='proto.SinkSpec.topic', index=3,
       number=3, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='producerSpec', full_name='proto.SinkSpec.producerSpec', index=4,
+      number=11, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', 
index=4,
+      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', 
index=5,
       number=4, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='builtin', full_name='proto.SinkSpec.builtin', index=5,
+      name='builtin', full_name='proto.SinkSpec.builtin', index=6,
       number=6, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='schemaType', full_name='proto.SinkSpec.schemaType', index=6,
+      name='schemaType', full_name='proto.SinkSpec.schemaType', index=7,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
-      name='forwardSourceMessageProperty', 
full_name='proto.SinkSpec.forwardSourceMessageProperty', index=7,
+      name='forwardSourceMessageProperty', 
full_name='proto.SinkSpec.forwardSourceMessageProperty', index=8,
       number=8, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='schemaProperties', full_name='proto.SinkSpec.schemaProperties', 
index=9,
+      number=9, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='consumerProperties', 
full_name='proto.SinkSpec.consumerProperties', index=10,
+      number=10, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
-  nested_types=[],
+  nested_types=[_SINKSPEC_SCHEMAPROPERTIESENTRY, 
_SINKSPEC_CONSUMERPROPERTIESENTRY, ],
   enum_types=[
   ],
   serialized_options=None,
@@ -786,8 +1077,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1587,
-  serialized_end=1770,
+  serialized_start=2039,
+  serialized_end=2515,
 )
 
 
@@ -797,6 +1088,7 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='packagePath', 
full_name='proto.PackageLocationMetaData.packagePath', index=0,
@@ -804,14 +1096,14 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='originalFileName', 
full_name='proto.PackageLocationMetaData.originalFileName', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -824,8 +1116,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1772,
-  serialized_end=1844,
+  serialized_start=2517,
+  serialized_end=2589,
 )
 
 
@@ -835,6 +1127,7 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = 
_descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='key', full_name='proto.FunctionMetaData.InstanceStatesEntry.key', 
index=0,
@@ -842,14 +1135,14 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = 
_descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='value', 
full_name='proto.FunctionMetaData.InstanceStatesEntry.value', index=1,
       number=2, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -862,8 +1155,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = 
_descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2140,
-  serialized_end=2215,
+  serialized_start=2885,
+  serialized_end=2960,
 )
 
 _FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -872,6 +1165,7 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='functionDetails', 
full_name='proto.FunctionMetaData.functionDetails', index=0,
@@ -879,42 +1173,42 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='packageLocation', 
full_name='proto.FunctionMetaData.packageLocation', index=1,
       number=2, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='version', full_name='proto.FunctionMetaData.version', index=2,
       number=3, type=4, cpp_type=4, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='createTime', full_name='proto.FunctionMetaData.createTime', 
index=3,
       number=4, type=4, cpp_type=4, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='instanceStates', 
full_name='proto.FunctionMetaData.instanceStates', index=4,
       number=5, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='functionAuthSpec', 
full_name='proto.FunctionMetaData.functionAuthSpec', index=5,
       number=6, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -927,8 +1221,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1847,
-  serialized_end=2215,
+  serialized_start=2592,
+  serialized_end=2960,
 )
 
 
@@ -938,6 +1232,7 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='data', full_name='proto.FunctionAuthenticationSpec.data', index=0,
@@ -945,14 +1240,14 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='provider', full_name='proto.FunctionAuthenticationSpec.provider', 
index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -965,8 +1260,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2217,
-  serialized_end=2277,
+  serialized_start=2962,
+  serialized_end=3022,
 )
 
 
@@ -976,6 +1271,7 @@ _INSTANCE = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='functionMetaData', full_name='proto.Instance.functionMetaData', 
index=0,
@@ -983,14 +1279,14 @@ _INSTANCE = _descriptor.Descriptor(
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='instanceId', full_name='proto.Instance.instanceId', index=1,
       number=2, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -1003,8 +1299,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2279,
-  serialized_end=2360,
+  serialized_start=3024,
+  serialized_end=3105,
 )
 
 
@@ -1014,6 +1310,7 @@ _ASSIGNMENT = _descriptor.Descriptor(
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
+  create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
       name='instance', full_name='proto.Assignment.instance', index=0,
@@ -1021,14 +1318,14 @@ _ASSIGNMENT = _descriptor.Descriptor(
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
     _descriptor.FieldDescriptor(
       name='workerId', full_name='proto.Assignment.workerId', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR,  
create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -1041,8 +1338,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2362,
-  serialized_end=2427,
+  serialized_start=3107,
+  serialized_end=3172,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_PROCESSINGGUARANTEES
@@ -1055,7 +1352,11 @@ 
_FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_CO
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
 _CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
+_CONSUMERSPEC_SCHEMAPROPERTIESENTRY.containing_type = _CONSUMERSPEC
+_CONSUMERSPEC_CONSUMERPROPERTIESENTRY.containing_type = _CONSUMERSPEC
 _CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type = 
_CONSUMERSPEC_RECEIVERQUEUESIZE
+_CONSUMERSPEC.fields_by_name['schemaProperties'].message_type = 
_CONSUMERSPEC_SCHEMAPROPERTIESENTRY
+_CONSUMERSPEC.fields_by_name['consumerProperties'].message_type = 
_CONSUMERSPEC_CONSUMERPROPERTIESENTRY
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
 _SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type = 
_CONSUMERSPEC
 _SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC
@@ -1063,6 +1364,11 @@ _SOURCESPEC.fields_by_name['subscriptionType'].enum_type 
= _SUBSCRIPTIONTYPE
 _SOURCESPEC.fields_by_name['topicsToSerDeClassName'].message_type = 
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY
 _SOURCESPEC.fields_by_name['inputSpecs'].message_type = 
_SOURCESPEC_INPUTSPECSENTRY
 _SOURCESPEC.fields_by_name['subscriptionPosition'].enum_type = 
_SUBSCRIPTIONPOSITION
+_SINKSPEC_SCHEMAPROPERTIESENTRY.containing_type = _SINKSPEC
+_SINKSPEC_CONSUMERPROPERTIESENTRY.containing_type = _SINKSPEC
+_SINKSPEC.fields_by_name['producerSpec'].message_type = _PRODUCERSPEC
+_SINKSPEC.fields_by_name['schemaProperties'].message_type = 
_SINKSPEC_SCHEMAPROPERTIESENTRY
+_SINKSPEC.fields_by_name['consumerProperties'].message_type = 
_SINKSPEC_CONSUMERPROPERTIESENTRY
 _FUNCTIONMETADATA_INSTANCESTATESENTRY.fields_by_name['value'].enum_type = 
_FUNCTIONSTATE
 _FUNCTIONMETADATA_INSTANCESTATESENTRY.containing_type = _FUNCTIONMETADATA
 _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = 
_FUNCTIONDETAILS
@@ -1075,6 +1381,7 @@ DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES
 DESCRIPTOR.message_types_by_name['RetryDetails'] = _RETRYDETAILS
 DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
 DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC
+DESCRIPTOR.message_types_by_name['ProducerSpec'] = _PRODUCERSPEC
 DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
 DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
 DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = 
_PACKAGELOCATIONMETADATA
@@ -1117,12 +1424,35 @@ ConsumerSpec = 
_reflection.GeneratedProtocolMessageType('ConsumerSpec', (_messag
     # 
@@protoc_insertion_point(class_scope:proto.ConsumerSpec.ReceiverQueueSize)
     })
   ,
+
+  'SchemaPropertiesEntry' : 
_reflection.GeneratedProtocolMessageType('SchemaPropertiesEntry', 
(_message.Message,), {
+    'DESCRIPTOR' : _CONSUMERSPEC_SCHEMAPROPERTIESENTRY,
+    '__module__' : 'Function_pb2'
+    # 
@@protoc_insertion_point(class_scope:proto.ConsumerSpec.SchemaPropertiesEntry)
+    })
+  ,
+
+  'ConsumerPropertiesEntry' : 
_reflection.GeneratedProtocolMessageType('ConsumerPropertiesEntry', 
(_message.Message,), {
+    'DESCRIPTOR' : _CONSUMERSPEC_CONSUMERPROPERTIESENTRY,
+    '__module__' : 'Function_pb2'
+    # 
@@protoc_insertion_point(class_scope:proto.ConsumerSpec.ConsumerPropertiesEntry)
+    })
+  ,
   'DESCRIPTOR' : _CONSUMERSPEC,
   '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.ConsumerSpec)
   })
 _sym_db.RegisterMessage(ConsumerSpec)
 _sym_db.RegisterMessage(ConsumerSpec.ReceiverQueueSize)
+_sym_db.RegisterMessage(ConsumerSpec.SchemaPropertiesEntry)
+_sym_db.RegisterMessage(ConsumerSpec.ConsumerPropertiesEntry)
+
+ProducerSpec = _reflection.GeneratedProtocolMessageType('ProducerSpec', 
(_message.Message,), {
+  'DESCRIPTOR' : _PRODUCERSPEC,
+  '__module__' : 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.ProducerSpec)
+  })
+_sym_db.RegisterMessage(ProducerSpec)
 
 SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', 
(_message.Message,), {
 
@@ -1148,11 +1478,27 @@ 
_sym_db.RegisterMessage(SourceSpec.TopicsToSerDeClassNameEntry)
 _sym_db.RegisterMessage(SourceSpec.InputSpecsEntry)
 
 SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec', 
(_message.Message,), {
+
+  'SchemaPropertiesEntry' : 
_reflection.GeneratedProtocolMessageType('SchemaPropertiesEntry', 
(_message.Message,), {
+    'DESCRIPTOR' : _SINKSPEC_SCHEMAPROPERTIESENTRY,
+    '__module__' : 'Function_pb2'
+    # 
@@protoc_insertion_point(class_scope:proto.SinkSpec.SchemaPropertiesEntry)
+    })
+  ,
+
+  'ConsumerPropertiesEntry' : 
_reflection.GeneratedProtocolMessageType('ConsumerPropertiesEntry', 
(_message.Message,), {
+    'DESCRIPTOR' : _SINKSPEC_CONSUMERPROPERTIESENTRY,
+    '__module__' : 'Function_pb2'
+    # 
@@protoc_insertion_point(class_scope:proto.SinkSpec.ConsumerPropertiesEntry)
+    })
+  ,
   'DESCRIPTOR' : _SINKSPEC,
   '__module__' : 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.SinkSpec)
   })
 _sym_db.RegisterMessage(SinkSpec)
+_sym_db.RegisterMessage(SinkSpec.SchemaPropertiesEntry)
+_sym_db.RegisterMessage(SinkSpec.ConsumerPropertiesEntry)
 
 PackageLocationMetaData = 
_reflection.GeneratedProtocolMessageType('PackageLocationMetaData', 
(_message.Message,), {
   'DESCRIPTOR' : _PACKAGELOCATIONMETADATA,
@@ -1199,9 +1545,13 @@ _sym_db.RegisterMessage(Assignment)
 
 
 DESCRIPTOR._options = None
+_CONSUMERSPEC_SCHEMAPROPERTIESENTRY._options = None
+_CONSUMERSPEC_CONSUMERPROPERTIESENTRY._options = None
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = None
 _SOURCESPEC_INPUTSPECSENTRY._options = None
 _SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = None
 _SOURCESPEC.fields_by_name['topicsPattern']._options = None
+_SINKSPEC_SCHEMAPROPERTIESENTRY._options = None
+_SINKSPEC_CONSUMERPROPERTIESENTRY._options = None
 _FUNCTIONMETADATA_INSTANCESTATESENTRY._options = None
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index f39fdfd..68cc936 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -101,6 +101,11 @@ message ConsumerSpec {
     map<string, string> consumerProperties = 6;
 }
 
+message ProducerSpec {
+    int32 maxPendingMessages = 1;
+    int32 maxPendingMessagesAcrossPartitions = 2;
+}
+
 message SourceSpec {
     string className = 1;
     // map in json format
@@ -138,6 +143,7 @@ message SinkSpec {
 
     // configs used only when functions output to sink
     string topic = 3;
+    ProducerSpec producerSpec = 11;
 
     string serDeClassName = 4;
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 342ba27..1b0e4fd 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -25,10 +25,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.functions.*;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.proto.Function;
@@ -198,6 +195,16 @@ public class FunctionConfigUtils {
         if (typeArgs != null) {
             sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
         }
+        if (functionConfig.getProducerConfig() != null) {
+            Function.ProducerSpec.Builder pbldr = 
Function.ProducerSpec.newBuilder();
+            if (functionConfig.getProducerConfig().getMaxPendingMessages() != 
null) {
+                
pbldr.setMaxPendingMessages(functionConfig.getProducerConfig().getMaxPendingMessages());
+            }
+            if 
(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != 
null) {
+                
pbldr.setMaxPendingMessagesAcrossPartitions(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
+            }
+            sinkSpecBuilder.setProducerSpec(pbldr.build());
+        }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
         if (functionConfig.getTenant() != null) {
@@ -343,6 +350,16 @@ public class FunctionConfigUtils {
         if (!isEmpty(functionDetails.getSink().getSchemaType())) {
             
functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
         }
+        if (functionDetails.getSink().getProducerSpec() != null) {
+            ProducerConfig producerConfig = new ProducerConfig();
+            if 
(functionDetails.getSink().getProducerSpec().getMaxPendingMessages() != 0) {
+                
producerConfig.setMaxPendingMessages(functionDetails.getSink().getProducerSpec().getMaxPendingMessages());
+            }
+            if 
(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()
 != 0) {
+                
producerConfig.setMaxPendingMessagesAcrossPartitions(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+            }
+            functionConfig.setProducerConfig(producerConfig);
+        }
         if (!isEmpty(functionDetails.getLogTopic())) {
             functionConfig.setLogTopic(functionDetails.getLogTopic());
         }
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index a246e53..d912206 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -28,6 +28,7 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -146,6 +147,17 @@ public class SourceConfigUtils {
             sinkSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
         }
 
+        if (sourceConfig.getProducerConfig() != null) {
+            Function.ProducerSpec.Builder pbldr = 
Function.ProducerSpec.newBuilder();
+            if (sourceConfig.getProducerConfig().getMaxPendingMessages() != 
null) {
+                
pbldr.setMaxPendingMessages(sourceConfig.getProducerConfig().getMaxPendingMessages());
+            }
+            if 
(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != 
null) {
+                
pbldr.setMaxPendingMessagesAcrossPartitions(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
+            }
+            sinkSpecBuilder.setProducerSpec(pbldr.build());
+        }
+
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
         // use default resources if resources not set
@@ -215,6 +227,16 @@ public class SourceConfigUtils {
         if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
             sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
         }
+        if (sinkSpec.getProducerSpec() != null) {
+            ProducerConfig producerConfig = new ProducerConfig();
+            if (sinkSpec.getProducerSpec().getMaxPendingMessages() != 0) {
+                
producerConfig.setMaxPendingMessages(sinkSpec.getProducerSpec().getMaxPendingMessages());
+            }
+            if 
(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
+                
producerConfig.setMaxPendingMessagesAcrossPartitions(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+            }
+            sourceConfig.setProducerConfig(producerConfig);
+        }
         if (functionDetails.hasResources()) {
             Resources resources = new Resources();
             resources.setCpu(functionDetails.getResources().getCpu());
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 9d8c0bc..e604980 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -20,10 +20,7 @@ package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.common.functions.ConsumerConfig;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.functions.*;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
@@ -66,6 +63,10 @@ public class FunctionConfigUtilsTest {
         functionConfig.setAutoAck(true);
         functionConfig.setTimeoutMs(2000l);
         functionConfig.setRuntimeFlags("-DKerberos");
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setMaxPendingMessages(100);
+        producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
 
@@ -101,6 +102,10 @@ public class FunctionConfigUtilsTest {
         functionConfig.setAutoAck(true);
         functionConfig.setTimeoutMs(2000l);
         functionConfig.setWindowConfig(new 
WindowConfig().setWindowLengthCount(10));
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setMaxPendingMessages(100);
+        producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = 
FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = 
FunctionConfigUtils.convertFromDetails(functionDetails);
 
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index d517026..65f29ef 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -23,6 +23,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -349,6 +350,11 @@ public class SourceConfigUtilsTest extends 
PowerMockTestCase {
         configs.put("bootstrapServers", "server-1,server-2");
         configs.put("consumerConfigProperties", consumerConfigs);
 
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setMaxPendingMessages(100);
+        producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        sourceConfig.setProducerConfig(producerConfig);
+
         sourceConfig.setConfigs(configs);
         return sourceConfig;
     }

Reply via email to