This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 22d7a6c Added ability to specify producer config for functions and sources (#7721) 22d7a6c is described below commit 22d7a6cbcc36c79be64d3f39f707139ab43889c6 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Aug 4 18:34:58 2020 -0700 Added ability to specify producer config for functions and sources (#7721) * Added ability to specify producer config for functions and sources * Fixed test * Fix test * Add generated function proto * Add header * Address comments Co-authored-by: Sanjeev Kulkarni <sanje...@splunk.com> --- .../pulsar/common/functions/FunctionConfig.java | 3 + .../pulsar/common/functions/ProducerConfig.java | 35 +- .../org/apache/pulsar/common/io/SourceConfig.java | 3 + .../pulsar/functions/instance/ContextImpl.java | 8 + .../functions/instance/JavaInstanceRunnable.java | 4 + .../apache/pulsar/functions/sink/PulsarSink.java | 8 + .../pulsar/functions/sink/PulsarSinkConfig.java | 2 + .../instance/src/main/python/Function_pb2.py | 610 ++++++++++++++++----- .../proto/src/main/proto/Function.proto | 6 + .../functions/utils/FunctionConfigUtils.java | 25 +- .../pulsar/functions/utils/SourceConfigUtils.java | 22 + .../functions/utils/FunctionConfigUtilsTest.java | 13 +- .../functions/utils/SourceConfigUtilsTest.java | 6 + 13 files changed, 589 insertions(+), 156 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 8c680e8..fa925c4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -77,6 +77,9 @@ public class FunctionConfig { private String output; + // Any configuration that need to be applied for producers + private ProducerConfig producerConfig; + /** * Represents either a builtin schema type (eg: 'avro', 'json', ect) or the class name for a Schema * implementation. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java similarity index 59% copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java copy to pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java index 4e47812..b28370e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java @@ -16,24 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.sink; +package org.apache.pulsar.common.functions; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.apache.pulsar.common.functions.FunctionConfig; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; -import java.util.Map; - -@Getter -@Setter -@ToString -public class PulsarSinkConfig { - private FunctionConfig.ProcessingGuarantees processingGuarantees; - private String topic; - private String serdeClassName; - private String schemaType; - private Map<String, String> schemaProperties; - private String typeClassName; - private boolean forwardSourceMessageProperty; +/** + * Configuration of the producer inside the function. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode +public class ProducerConfig { + private Integer maxPendingMessages; + private Integer maxPendingMessagesAcrossPartitions; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java index b3a5634..31a8634 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; /** @@ -41,6 +42,8 @@ public class SourceConfig { private String topicName; + private ProducerConfig producerConfig; + private String serdeClassName; private String schemaType; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index c61aee0..763d9f7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -108,6 +108,14 @@ class ContextImpl implements Context, SinkContext, SourceContext { this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS); + if (config.getFunctionDetails().getSink().getProducerSpec() != null) { + if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) { + this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages()); + } + if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) { + this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()); + } + } if (config.getFunctionDetails().getUserConfig().isEmpty()) { userConfigs = new HashMap<>(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 29cb71f..45a8174 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -812,6 +812,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap()); + if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) { + pulsarSinkConfig.setProducerSpec(this.instanceConfig.getFunctionDetails().getSink().getProducerSpec()); + } + object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader); } } else { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 9aa8bc1..00d12c1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -103,6 +103,14 @@ public class PulsarSink<T> implements Sink<T> { if (producerName != null) { builder.producerName(producerName); } + if (pulsarSinkConfig.getProducerSpec() != null) { + if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessages() != 0) { + builder.maxPendingMessages(pulsarSinkConfig.getProducerSpec().getMaxPendingMessages()); + } + if (pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) { + builder.maxPendingMessagesAcrossPartitions(pulsarSinkConfig.getProducerSpec().getMaxPendingMessagesAcrossPartitions()); + } + } return builder.properties(properties).create(); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java index 4e47812..a4ba7e3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java @@ -22,6 +22,7 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.proto.Function; import java.util.Map; @@ -36,4 +37,5 @@ public class PulsarSinkConfig { private Map<String, String> schemaProperties; private String typeClassName; private boolean forwardSourceMessageProperty; + private Function.ProducerSpec producerSpec; } diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index 108ed79..203809d 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -17,7 +17,6 @@ # under the License. # -# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Function.proto @@ -38,7 +37,8 @@ DESCRIPTOR = _descriptor.FileDescriptor( package='proto', syntax='proto3', serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function', - serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xa3\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...] + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...] ) _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( @@ -46,24 +46,28 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( full_name='proto.ProcessingGuarantees', filename=None, file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, values=[ _descriptor.EnumValueDescriptor( name='ATLEAST_ONCE', index=0, number=0, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='ATMOST_ONCE', index=1, number=1, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='EFFECTIVELY_ONCE', index=2, number=2, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), ], containing_type=None, serialized_options=None, - serialized_start=2429, - serialized_end=2508, + serialized_start=3174, + serialized_end=3253, ) _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES) @@ -73,20 +77,28 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( full_name='proto.SubscriptionType', filename=None, file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, values=[ _descriptor.EnumValueDescriptor( name='SHARED', index=0, number=0, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='FAILOVER', index=1, number=1, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), + _descriptor.EnumValueDescriptor( + name='KEY_SHARED', index=2, number=2, + serialized_options=None, + type=None, + create_key=_descriptor._internal_create_key), ], containing_type=None, serialized_options=None, - serialized_start=2510, - serialized_end=2554, + serialized_start=3255, + serialized_end=3315, ) _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE) @@ -96,20 +108,23 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor( full_name='proto.SubscriptionPosition', filename=None, file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, values=[ _descriptor.EnumValueDescriptor( name='LATEST', index=0, number=0, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='EARLIEST', index=1, number=1, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), ], containing_type=None, serialized_options=None, - serialized_start=2556, - serialized_end=2604, + serialized_start=3317, + serialized_end=3365, ) _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION) @@ -119,20 +134,23 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor( full_name='proto.FunctionState', filename=None, file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, values=[ _descriptor.EnumValueDescriptor( name='RUNNING', index=0, number=0, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='STOPPED', index=1, number=1, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), ], containing_type=None, serialized_options=None, - serialized_start=2606, - serialized_end=2647, + serialized_start=3367, + serialized_end=3408, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE) @@ -142,6 +160,7 @@ ATMOST_ONCE = 1 EFFECTIVELY_ONCE = 2 SHARED = 0 FAILOVER = 1 +KEY_SHARED = 2 LATEST = 0 EARLIEST = 1 RUNNING = 0 @@ -153,24 +172,28 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor( full_name='proto.FunctionDetails.Runtime', filename=None, file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, values=[ _descriptor.EnumValueDescriptor( name='JAVA', index=0, number=0, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='PYTHON', index=1, number=1, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='GO', index=2, number=3, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), ], containing_type=None, serialized_options=None, - serialized_start=717, - serialized_end=756, + serialized_start=785, + serialized_end=824, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME) @@ -179,28 +202,33 @@ _FUNCTIONDETAILS_COMPONENTTYPE = _descriptor.EnumDescriptor( full_name='proto.FunctionDetails.ComponentType', filename=None, file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, values=[ _descriptor.EnumValueDescriptor( name='UNKNOWN', index=0, number=0, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='FUNCTION', index=1, number=1, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='SOURCE', index=2, number=2, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), _descriptor.EnumValueDescriptor( name='SINK', index=3, number=3, serialized_options=None, - type=None), + type=None, + create_key=_descriptor._internal_create_key), ], containing_type=None, serialized_options=None, - serialized_start=758, - serialized_end=822, + serialized_start=826, + serialized_end=890, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE) @@ -211,6 +239,7 @@ _RESOURCES = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='cpu', full_name='proto.Resources.cpu', index=0, @@ -218,21 +247,21 @@ _RESOURCES = _descriptor.Descriptor( has_default_value=False, default_value=float(0), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='ram', full_name='proto.Resources.ram', index=1, number=2, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='disk', full_name='proto.Resources.disk', index=2, number=3, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -256,6 +285,7 @@ _RETRYDETAILS = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='maxMessageRetries', full_name='proto.RetryDetails.maxMessageRetries', index=0, @@ -263,14 +293,14 @@ _RETRYDETAILS = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -294,6 +324,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='tenant', full_name='proto.FunctionDetails.tenant', index=0, @@ -301,133 +332,154 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='namespace', full_name='proto.FunctionDetails.namespace', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='name', full_name='proto.FunctionDetails.name', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='className', full_name='proto.FunctionDetails.className', index=3, number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4, number=5, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=5, number=6, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6, number=7, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7, number=16, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='runtime', full_name='proto.FunctionDetails.runtime', index=8, number=8, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9, number=9, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='parallelism', full_name='proto.FunctionDetails.parallelism', index=10, number=10, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='source', full_name='proto.FunctionDetails.source', index=11, number=11, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='sink', full_name='proto.FunctionDetails.sink', index=12, number=12, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='resources', full_name='proto.FunctionDetails.resources', index=13, number=13, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=14, number=14, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=15, number=15, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='runtimeFlags', full_name='proto.FunctionDetails.runtimeFlags', index=16, number=17, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='componentType', full_name='proto.FunctionDetails.componentType', index=17, number=18, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='customRuntimeOptions', full_name='proto.FunctionDetails.customRuntimeOptions', index=18, number=19, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='builtin', full_name='proto.FunctionDetails.builtin', index=19, + number=20, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='retainOrdering', full_name='proto.FunctionDetails.retainOrdering', index=20, + number=21, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='retainKeyOrdering', full_name='proto.FunctionDetails.retainKeyOrdering', index=21, + number=22, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -443,7 +495,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( oneofs=[ ], serialized_start=147, - serialized_end=822, + serialized_end=890, ) @@ -453,6 +505,7 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='value', full_name='proto.ConsumerSpec.ReceiverQueueSize.value', index=0, @@ -460,7 +513,7 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -473,8 +526,84 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=975, - serialized_end=1009, + serialized_start=1185, + serialized_end=1219, +) + +_CONSUMERSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor( + name='SchemaPropertiesEntry', + full_name='proto.ConsumerSpec.SchemaPropertiesEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='proto.ConsumerSpec.SchemaPropertiesEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='value', full_name='proto.ConsumerSpec.SchemaPropertiesEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=b'8\001', + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1221, + serialized_end=1276, +) + +_CONSUMERSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor( + name='ConsumerPropertiesEntry', + full_name='proto.ConsumerSpec.ConsumerPropertiesEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='proto.ConsumerSpec.ConsumerPropertiesEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='value', full_name='proto.ConsumerSpec.ConsumerPropertiesEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=b'8\001', + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1278, + serialized_end=1335, ) _CONSUMERSPEC = _descriptor.Descriptor( @@ -483,6 +612,7 @@ _CONSUMERSPEC = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='schemaType', full_name='proto.ConsumerSpec.schemaType', index=0, @@ -490,32 +620,85 @@ _CONSUMERSPEC = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='serdeClassName', full_name='proto.ConsumerSpec.serdeClassName', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='isRegexPattern', full_name='proto.ConsumerSpec.isRegexPattern', index=2, number=3, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='receiverQueueSize', full_name='proto.ConsumerSpec.receiverQueueSize', index=3, number=4, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='schemaProperties', full_name='proto.ConsumerSpec.schemaProperties', index=4, + number=5, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='consumerProperties', full_name='proto.ConsumerSpec.consumerProperties', index=5, + number=6, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, _CONSUMERSPEC_SCHEMAPROPERTIESENTRY, _CONSUMERSPEC_CONSUMERPROPERTIESENTRY, ], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=893, + serialized_end=1335, +) + + +_PRODUCERSPEC = _descriptor.Descriptor( + name='ProducerSpec', + full_name='proto.ProducerSpec', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='maxPendingMessages', full_name='proto.ProducerSpec.maxPendingMessages', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='maxPendingMessagesAcrossPartitions', full_name='proto.ProducerSpec.maxPendingMessagesAcrossPartitions', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], - nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, ], + nested_types=[], enum_types=[ ], serialized_options=None, @@ -524,8 +707,8 @@ _CONSUMERSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=825, - serialized_end=1009, + serialized_start=1337, + serialized_end=1423, ) @@ -535,6 +718,7 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='key', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.key', index=0, @@ -542,14 +726,14 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='value', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -562,8 +746,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1451, - serialized_end=1512, + serialized_start=1903, + serialized_end=1964, ) _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( @@ -572,6 +756,7 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='key', full_name='proto.SourceSpec.InputSpecsEntry.key', index=0, @@ -579,14 +764,14 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='value', full_name='proto.SourceSpec.InputSpecsEntry.value', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -599,8 +784,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1514, - serialized_end=1584, + serialized_start=1966, + serialized_end=2036, ) _SOURCESPEC = _descriptor.Descriptor( @@ -609,6 +794,7 @@ _SOURCESPEC = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='className', full_name='proto.SourceSpec.className', index=0, @@ -616,84 +802,91 @@ _SOURCESPEC = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='configs', full_name='proto.SourceSpec.configs', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2, number=5, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3, number=3, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=b'\030\001', file=DESCRIPTOR), + serialized_options=b'\030\001', file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='inputSpecs', full_name='proto.SourceSpec.inputSpecs', index=5, number=10, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=6, number=6, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='topicsPattern', full_name='proto.SourceSpec.topicsPattern', index=7, number=7, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=b'\030\001', file=DESCRIPTOR), + serialized_options=b'\030\001', file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='builtin', full_name='proto.SourceSpec.builtin', index=8, number=8, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='subscriptionName', full_name='proto.SourceSpec.subscriptionName', index=9, number=9, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='cleanupSubscription', full_name='proto.SourceSpec.cleanupSubscription', index=10, number=11, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='subscriptionPosition', full_name='proto.SourceSpec.subscriptionPosition', index=11, number=12, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='negativeAckRedeliveryDelayMs', full_name='proto.SourceSpec.negativeAckRedeliveryDelayMs', index=12, + number=13, type=4, cpp_type=4, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -706,17 +899,94 @@ _SOURCESPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1012, - serialized_end=1584, + serialized_start=1426, + serialized_end=2036, ) +_SINKSPEC_SCHEMAPROPERTIESENTRY = _descriptor.Descriptor( + name='SchemaPropertiesEntry', + full_name='proto.SinkSpec.SchemaPropertiesEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='proto.SinkSpec.SchemaPropertiesEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='value', full_name='proto.SinkSpec.SchemaPropertiesEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=b'8\001', + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1221, + serialized_end=1276, +) + +_SINKSPEC_CONSUMERPROPERTIESENTRY = _descriptor.Descriptor( + name='ConsumerPropertiesEntry', + full_name='proto.SinkSpec.ConsumerPropertiesEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='proto.SinkSpec.ConsumerPropertiesEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='value', full_name='proto.SinkSpec.ConsumerPropertiesEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=b'8\001', + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1278, + serialized_end=1335, +) + _SINKSPEC = _descriptor.Descriptor( name='SinkSpec', full_name='proto.SinkSpec', filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='className', full_name='proto.SinkSpec.className', index=0, @@ -724,60 +994,81 @@ _SINKSPEC = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='configs', full_name='proto.SinkSpec.configs', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2, number=5, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='topic', full_name='proto.SinkSpec.topic', index=3, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='producerSpec', full_name='proto.SinkSpec.producerSpec', index=4, + number=11, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4, + name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=5, number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='builtin', full_name='proto.SinkSpec.builtin', index=5, + name='builtin', full_name='proto.SinkSpec.builtin', index=6, number=6, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='schemaType', full_name='proto.SinkSpec.schemaType', index=6, + name='schemaType', full_name='proto.SinkSpec.schemaType', index=7, number=7, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='forwardSourceMessageProperty', full_name='proto.SinkSpec.forwardSourceMessageProperty', index=7, + name='forwardSourceMessageProperty', full_name='proto.SinkSpec.forwardSourceMessageProperty', index=8, number=8, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='schemaProperties', full_name='proto.SinkSpec.schemaProperties', index=9, + number=9, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='consumerProperties', full_name='proto.SinkSpec.consumerProperties', index=10, + number=10, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], - nested_types=[], + nested_types=[_SINKSPEC_SCHEMAPROPERTIESENTRY, _SINKSPEC_CONSUMERPROPERTIESENTRY, ], enum_types=[ ], serialized_options=None, @@ -786,8 +1077,8 @@ _SINKSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1587, - serialized_end=1770, + serialized_start=2039, + serialized_end=2515, ) @@ -797,6 +1088,7 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='packagePath', full_name='proto.PackageLocationMetaData.packagePath', index=0, @@ -804,14 +1096,14 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='originalFileName', full_name='proto.PackageLocationMetaData.originalFileName', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -824,8 +1116,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1772, - serialized_end=1844, + serialized_start=2517, + serialized_end=2589, ) @@ -835,6 +1127,7 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='key', full_name='proto.FunctionMetaData.InstanceStatesEntry.key', index=0, @@ -842,14 +1135,14 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor( has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='value', full_name='proto.FunctionMetaData.InstanceStatesEntry.value', index=1, number=2, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -862,8 +1155,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2140, - serialized_end=2215, + serialized_start=2885, + serialized_end=2960, ) _FUNCTIONMETADATA = _descriptor.Descriptor( @@ -872,6 +1165,7 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='functionDetails', full_name='proto.FunctionMetaData.functionDetails', index=0, @@ -879,42 +1173,42 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='packageLocation', full_name='proto.FunctionMetaData.packageLocation', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='version', full_name='proto.FunctionMetaData.version', index=2, number=3, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='createTime', full_name='proto.FunctionMetaData.createTime', index=3, number=4, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='instanceStates', full_name='proto.FunctionMetaData.instanceStates', index=4, number=5, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='functionAuthSpec', full_name='proto.FunctionMetaData.functionAuthSpec', index=5, number=6, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -927,8 +1221,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1847, - serialized_end=2215, + serialized_start=2592, + serialized_end=2960, ) @@ -938,6 +1232,7 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='data', full_name='proto.FunctionAuthenticationSpec.data', index=0, @@ -945,14 +1240,14 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor( has_default_value=False, default_value=b"", message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='provider', full_name='proto.FunctionAuthenticationSpec.provider', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -965,8 +1260,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2217, - serialized_end=2277, + serialized_start=2962, + serialized_end=3022, ) @@ -976,6 +1271,7 @@ _INSTANCE = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='functionMetaData', full_name='proto.Instance.functionMetaData', index=0, @@ -983,14 +1279,14 @@ _INSTANCE = _descriptor.Descriptor( has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='instanceId', full_name='proto.Instance.instanceId', index=1, number=2, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -1003,8 +1299,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2279, - serialized_end=2360, + serialized_start=3024, + serialized_end=3105, ) @@ -1014,6 +1310,7 @@ _ASSIGNMENT = _descriptor.Descriptor( filename=None, file=DESCRIPTOR, containing_type=None, + create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( name='instance', full_name='proto.Assignment.instance', index=0, @@ -1021,14 +1318,14 @@ _ASSIGNMENT = _descriptor.Descriptor( has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( name='workerId', full_name='proto.Assignment.workerId', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR), + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -1041,8 +1338,8 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2362, - serialized_end=2427, + serialized_start=3107, + serialized_end=3172, ) _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES @@ -1055,7 +1352,11 @@ _FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_CO _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS _FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS _CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC +_CONSUMERSPEC_SCHEMAPROPERTIESENTRY.containing_type = _CONSUMERSPEC +_CONSUMERSPEC_CONSUMERPROPERTIESENTRY.containing_type = _CONSUMERSPEC _CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type = _CONSUMERSPEC_RECEIVERQUEUESIZE +_CONSUMERSPEC.fields_by_name['schemaProperties'].message_type = _CONSUMERSPEC_SCHEMAPROPERTIESENTRY +_CONSUMERSPEC.fields_by_name['consumerProperties'].message_type = _CONSUMERSPEC_CONSUMERPROPERTIESENTRY _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC _SOURCESPEC_INPUTSPECSENTRY.fields_by_name['value'].message_type = _CONSUMERSPEC _SOURCESPEC_INPUTSPECSENTRY.containing_type = _SOURCESPEC @@ -1063,6 +1364,11 @@ _SOURCESPEC.fields_by_name['subscriptionType'].enum_type = _SUBSCRIPTIONTYPE _SOURCESPEC.fields_by_name['topicsToSerDeClassName'].message_type = _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY _SOURCESPEC.fields_by_name['inputSpecs'].message_type = _SOURCESPEC_INPUTSPECSENTRY _SOURCESPEC.fields_by_name['subscriptionPosition'].enum_type = _SUBSCRIPTIONPOSITION +_SINKSPEC_SCHEMAPROPERTIESENTRY.containing_type = _SINKSPEC +_SINKSPEC_CONSUMERPROPERTIESENTRY.containing_type = _SINKSPEC +_SINKSPEC.fields_by_name['producerSpec'].message_type = _PRODUCERSPEC +_SINKSPEC.fields_by_name['schemaProperties'].message_type = _SINKSPEC_SCHEMAPROPERTIESENTRY +_SINKSPEC.fields_by_name['consumerProperties'].message_type = _SINKSPEC_CONSUMERPROPERTIESENTRY _FUNCTIONMETADATA_INSTANCESTATESENTRY.fields_by_name['value'].enum_type = _FUNCTIONSTATE _FUNCTIONMETADATA_INSTANCESTATESENTRY.containing_type = _FUNCTIONMETADATA _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = _FUNCTIONDETAILS @@ -1075,6 +1381,7 @@ DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES DESCRIPTOR.message_types_by_name['RetryDetails'] = _RETRYDETAILS DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS DESCRIPTOR.message_types_by_name['ConsumerSpec'] = _CONSUMERSPEC +DESCRIPTOR.message_types_by_name['ProducerSpec'] = _PRODUCERSPEC DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA @@ -1117,12 +1424,35 @@ ConsumerSpec = _reflection.GeneratedProtocolMessageType('ConsumerSpec', (_messag # @@protoc_insertion_point(class_scope:proto.ConsumerSpec.ReceiverQueueSize) }) , + + 'SchemaPropertiesEntry' : _reflection.GeneratedProtocolMessageType('SchemaPropertiesEntry', (_message.Message,), { + 'DESCRIPTOR' : _CONSUMERSPEC_SCHEMAPROPERTIESENTRY, + '__module__' : 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.ConsumerSpec.SchemaPropertiesEntry) + }) + , + + 'ConsumerPropertiesEntry' : _reflection.GeneratedProtocolMessageType('ConsumerPropertiesEntry', (_message.Message,), { + 'DESCRIPTOR' : _CONSUMERSPEC_CONSUMERPROPERTIESENTRY, + '__module__' : 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.ConsumerSpec.ConsumerPropertiesEntry) + }) + , 'DESCRIPTOR' : _CONSUMERSPEC, '__module__' : 'Function_pb2' # @@protoc_insertion_point(class_scope:proto.ConsumerSpec) }) _sym_db.RegisterMessage(ConsumerSpec) _sym_db.RegisterMessage(ConsumerSpec.ReceiverQueueSize) +_sym_db.RegisterMessage(ConsumerSpec.SchemaPropertiesEntry) +_sym_db.RegisterMessage(ConsumerSpec.ConsumerPropertiesEntry) + +ProducerSpec = _reflection.GeneratedProtocolMessageType('ProducerSpec', (_message.Message,), { + 'DESCRIPTOR' : _PRODUCERSPEC, + '__module__' : 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.ProducerSpec) + }) +_sym_db.RegisterMessage(ProducerSpec) SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), { @@ -1148,11 +1478,27 @@ _sym_db.RegisterMessage(SourceSpec.TopicsToSerDeClassNameEntry) _sym_db.RegisterMessage(SourceSpec.InputSpecsEntry) SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec', (_message.Message,), { + + 'SchemaPropertiesEntry' : _reflection.GeneratedProtocolMessageType('SchemaPropertiesEntry', (_message.Message,), { + 'DESCRIPTOR' : _SINKSPEC_SCHEMAPROPERTIESENTRY, + '__module__' : 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.SinkSpec.SchemaPropertiesEntry) + }) + , + + 'ConsumerPropertiesEntry' : _reflection.GeneratedProtocolMessageType('ConsumerPropertiesEntry', (_message.Message,), { + 'DESCRIPTOR' : _SINKSPEC_CONSUMERPROPERTIESENTRY, + '__module__' : 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.SinkSpec.ConsumerPropertiesEntry) + }) + , 'DESCRIPTOR' : _SINKSPEC, '__module__' : 'Function_pb2' # @@protoc_insertion_point(class_scope:proto.SinkSpec) }) _sym_db.RegisterMessage(SinkSpec) +_sym_db.RegisterMessage(SinkSpec.SchemaPropertiesEntry) +_sym_db.RegisterMessage(SinkSpec.ConsumerPropertiesEntry) PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), { 'DESCRIPTOR' : _PACKAGELOCATIONMETADATA, @@ -1199,9 +1545,13 @@ _sym_db.RegisterMessage(Assignment) DESCRIPTOR._options = None +_CONSUMERSPEC_SCHEMAPROPERTIESENTRY._options = None +_CONSUMERSPEC_CONSUMERPROPERTIESENTRY._options = None _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = None _SOURCESPEC_INPUTSPECSENTRY._options = None _SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = None _SOURCESPEC.fields_by_name['topicsPattern']._options = None +_SINKSPEC_SCHEMAPROPERTIESENTRY._options = None +_SINKSPEC_CONSUMERPROPERTIESENTRY._options = None _FUNCTIONMETADATA_INSTANCESTATESENTRY._options = None # @@protoc_insertion_point(module_scope) diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index f39fdfd..68cc936 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -101,6 +101,11 @@ message ConsumerSpec { map<string, string> consumerProperties = 6; } +message ProducerSpec { + int32 maxPendingMessages = 1; + int32 maxPendingMessagesAcrossPartitions = 2; +} + message SourceSpec { string className = 1; // map in json format @@ -138,6 +143,7 @@ message SinkSpec { // configs used only when functions output to sink string topic = 3; + ProducerSpec producerSpec = 11; string serDeClassName = 4; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 342ba27..1b0e4fd 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -25,10 +25,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.common.functions.ConsumerConfig; -import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.common.functions.Resources; -import org.apache.pulsar.common.functions.WindowConfig; +import org.apache.pulsar.common.functions.*; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.proto.Function; @@ -198,6 +195,16 @@ public class FunctionConfigUtils { if (typeArgs != null) { sinkSpecBuilder.setTypeClassName(typeArgs[1].getName()); } + if (functionConfig.getProducerConfig() != null) { + Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder(); + if (functionConfig.getProducerConfig().getMaxPendingMessages() != null) { + pbldr.setMaxPendingMessages(functionConfig.getProducerConfig().getMaxPendingMessages()); + } + if (functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != null) { + pbldr.setMaxPendingMessagesAcrossPartitions(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions()); + } + sinkSpecBuilder.setProducerSpec(pbldr.build()); + } functionDetailsBuilder.setSink(sinkSpecBuilder); if (functionConfig.getTenant() != null) { @@ -343,6 +350,16 @@ public class FunctionConfigUtils { if (!isEmpty(functionDetails.getSink().getSchemaType())) { functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType()); } + if (functionDetails.getSink().getProducerSpec() != null) { + ProducerConfig producerConfig = new ProducerConfig(); + if (functionDetails.getSink().getProducerSpec().getMaxPendingMessages() != 0) { + producerConfig.setMaxPendingMessages(functionDetails.getSink().getProducerSpec().getMaxPendingMessages()); + } + if (functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) { + producerConfig.setMaxPendingMessagesAcrossPartitions(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions()); + } + functionConfig.setProducerConfig(producerConfig); + } if (!isEmpty(functionDetails.getLogTopic())) { functionConfig.setLogTopic(functionDetails.getLogTopic()); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java index a246e53..d912206 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java @@ -28,6 +28,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -146,6 +147,17 @@ public class SourceConfigUtils { sinkSpecBuilder.setTypeClassName(sourceDetails.getTypeArg()); } + if (sourceConfig.getProducerConfig() != null) { + Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder(); + if (sourceConfig.getProducerConfig().getMaxPendingMessages() != null) { + pbldr.setMaxPendingMessages(sourceConfig.getProducerConfig().getMaxPendingMessages()); + } + if (sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != null) { + pbldr.setMaxPendingMessagesAcrossPartitions(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions()); + } + sinkSpecBuilder.setProducerSpec(pbldr.build()); + } + functionDetailsBuilder.setSink(sinkSpecBuilder); // use default resources if resources not set @@ -215,6 +227,16 @@ public class SourceConfigUtils { if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) { sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName()); } + if (sinkSpec.getProducerSpec() != null) { + ProducerConfig producerConfig = new ProducerConfig(); + if (sinkSpec.getProducerSpec().getMaxPendingMessages() != 0) { + producerConfig.setMaxPendingMessages(sinkSpec.getProducerSpec().getMaxPendingMessages()); + } + if (sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) { + producerConfig.setMaxPendingMessagesAcrossPartitions(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions()); + } + sourceConfig.setProducerConfig(producerConfig); + } if (functionDetails.hasResources()) { Resources resources = new Resources(); resources.setCpu(functionDetails.getResources().getCpu()); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java index 9d8c0bc..e604980 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java @@ -20,10 +20,7 @@ package org.apache.pulsar.functions.utils; import com.google.gson.Gson; import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.common.functions.ConsumerConfig; -import org.apache.pulsar.common.functions.FunctionConfig; -import org.apache.pulsar.common.functions.Resources; -import org.apache.pulsar.common.functions.WindowConfig; +import org.apache.pulsar.common.functions.*; import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; @@ -66,6 +63,10 @@ public class FunctionConfigUtilsTest { functionConfig.setAutoAck(true); functionConfig.setTimeoutMs(2000l); functionConfig.setRuntimeFlags("-DKerberos"); + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setMaxPendingMessages(100); + producerConfig.setMaxPendingMessagesAcrossPartitions(1000); + functionConfig.setProducerConfig(producerConfig); Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); @@ -101,6 +102,10 @@ public class FunctionConfigUtilsTest { functionConfig.setAutoAck(true); functionConfig.setTimeoutMs(2000l); functionConfig.setWindowConfig(new WindowConfig().setWindowLengthCount(10)); + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setMaxPendingMessages(100); + producerConfig.setMaxPendingMessagesAcrossPartitions(1000); + functionConfig.setProducerConfig(producerConfig); Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null); FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails); diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java index d517026..65f29ef 100644 --- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java @@ -23,6 +23,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.ProducerConfig; import org.apache.pulsar.common.functions.Resources; import org.apache.pulsar.common.io.BatchSourceConfig; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -349,6 +350,11 @@ public class SourceConfigUtilsTest extends PowerMockTestCase { configs.put("bootstrapServers", "server-1,server-2"); configs.put("consumerConfigProperties", consumerConfigs); + ProducerConfig producerConfig = new ProducerConfig(); + producerConfig.setMaxPendingMessages(100); + producerConfig.setMaxPendingMessagesAcrossPartitions(1000); + sourceConfig.setProducerConfig(producerConfig); + sourceConfig.setConfigs(configs); return sourceConfig; }