This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7b239ec Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315) 7b239ec is described below commit 7b239ec838f36ef2d61a817f858d0b4711b6c219 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Jan 7 14:01:46 2019 -0800 Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315) * clean up and correct properties to producer and consumers created by functions * fix test * cleaning up comment * refactoring --- .../pulsar/functions/instance/ContextImpl.java | 13 +++++++++++- .../pulsar/functions/instance/InstanceUtils.java | 22 ++++++++++++++++++++ .../functions/instance/JavaInstanceRunnable.java | 20 ++++++++++++------ .../apache/pulsar/functions/sink/PulsarSink.java | 19 ++++++++--------- .../pulsar/functions/source/PulsarSource.java | 13 +++++------- .../instance/src/main/python/contextimpl.py | 7 ++++++- .../instance/src/main/python/python_instance.py | 24 ++++++++++++++++++---- pulsar-functions/instance/src/main/python/util.py | 6 ++++++ .../pulsar/functions/sink/PulsarSinkTest.java | 18 ++++++++-------- .../pulsar/functions/source/PulsarSourceTest.java | 11 +++++----- 10 files changed, 110 insertions(+), 43 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 460ba75..97956b8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.source.TopicSchema; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; @@ -95,6 +96,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1); userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric"; } + private final Utils.ComponentType componentType; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels, @@ -148,6 +150,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { .quantile(0.99, 0.01) .quantile(0.999, 0.01) .register(collectorRegistry); + this.componentType = componentType; } public void setCurrentMessageContext(Record<?> record) { @@ -307,7 +310,15 @@ class ContextImpl implements Context, SinkContext, SourceContext { if (producer == null) { try { Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone()) - .schema(schema).topic(topicName).create(); + .schema(schema) + .topic(topicName) + .properties(InstanceUtils.getProperties(componentType, + FunctionDetailsUtils.getFullyQualifiedName( + this.config.getFunctionDetails().getTenant(), + this.config.getFunctionDetails().getNamespace(), + this.config.getFunctionDetails().getName()), + this.config.getInstanceId())) + .create(); Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index 9e32736..88a9df3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -36,6 +36,9 @@ import org.apache.pulsar.functions.utils.Reflections; import net.jodah.typetools.TypeResolver; import org.apache.pulsar.functions.utils.Utils; +import java.util.HashMap; +import java.util.Map; + @UtilityClass public class InstanceUtils { public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg, @@ -103,4 +106,23 @@ public class InstanceUtils { } return SINK; } + + public static Map<String, String> getProperties(Utils.ComponentType componentType, + String fullyQualifiedName, int instanceId) { + Map<String, String> properties = new HashMap<>(); + switch (componentType) { + case FUNCTION: + properties.put("application", "pulsar-function"); + break; + case SOURCE: + properties.put("application", "pulsar-source"); + break; + case SINK: + properties.put("application", "pulsar-sink"); + break; + } + properties.put("id", fullyQualifiedName); + properties.put("instance_id", String.valueOf(instanceId)); + return properties; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 7c36b58..fde9628 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -132,6 +132,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private InstanceCache instanceCache; + private final Utils.ComponentType componentType; + + private final Map<String, String> properties; + public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, @@ -156,6 +160,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()) }; + this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails()); + + this.properties = InstanceUtils.getProperties(this.componentType, + FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()), + this.instanceConfig.getInstanceId()); + // Declare function local collector registry so that it will not clash with other function instances' // metrics collection especially in threaded mode // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down @@ -205,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider, - collectorRegistry, metricsLabels, InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails())); + collectorRegistry, metricsLabels, this.componentType); } /** @@ -221,7 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), - InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails())); + this.componentType); ContextImpl contextImpl = setupContext(); javaInstance = setupJavaInstance(contextImpl); @@ -648,8 +658,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()); pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic()); } - object = new PulsarSource(this.client, pulsarSourceConfig, - FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + object = new PulsarSource(this.client, pulsarSourceConfig, this.properties); } else { object = Reflections.createInstance( sourceSpec.getClassName(), @@ -695,8 +704,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); - object = new PulsarSink(this.client, pulsarSinkConfig, - FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails())); + object = new PulsarSink(this.client, pulsarSinkConfig, this.properties); } } else { object = Reflections.createInstance( diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 1c61b45..3195a82 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -32,11 +32,13 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.FunctionResultRouter; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; @@ -54,12 +56,12 @@ public class PulsarSink<T> implements Sink<T> { private final PulsarClient client; private final PulsarSinkConfig pulsarSinkConfig; + private final Map<String, String> properties; @VisibleForTesting PulsarSinkProcessor<T> pulsarSinkProcessor; private final TopicSchema topicSchema; - private final String fqfn; private interface PulsarSinkProcessor<T> { @@ -78,7 +80,7 @@ public class PulsarSink<T> implements Sink<T> { this.schema = schema; } - public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn) + public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema) throws PulsarClientException { ProducerBuilder<T> builder = client.newProducer(schema) .blockIfQueueFull(true) @@ -96,9 +98,7 @@ public class PulsarSink<T> implements Sink<T> { builder.producerName(producerName); } - return builder - .property("application", "pulsarfunction") - .property("fqfn", fqfn).create(); + return builder.properties(properties).create(); } protected Producer<T> getProducer(String destinationTopic) { @@ -112,8 +112,7 @@ public class PulsarSink<T> implements Sink<T> { client, topicName, producerName, - schema, - fqfn); + schema); } catch (PulsarClientException e) { log.error("Failed to create Producer while doing user publish", e); throw new RuntimeException(e); @@ -143,7 +142,7 @@ public class PulsarSink<T> implements Sink<T> { // initialize default topic try { publishProducers.put(pulsarSinkConfig.getTopic(), - createProducer(client, pulsarSinkConfig.getTopic(), null, schema, fqfn)); + createProducer(client, pulsarSinkConfig.getTopic(), null, schema)); } catch (PulsarClientException e) { log.error("Failed to create Producer while doing user publish", e); throw new RuntimeException(e); } @@ -209,11 +208,11 @@ public class PulsarSink<T> implements Sink<T> { } } - public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) { + public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties) { this.client = client; this.pulsarSinkConfig = pulsarSinkConfig; this.topicSchema = new TopicSchema(client); - this.fqfn = fqfn; + this.properties = properties; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index ff41dc8..869c706 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -36,7 +36,9 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; @@ -45,17 +47,16 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> private final PulsarClient pulsarClient; private final PulsarSourceConfig pulsarSourceConfig; + private final Map<String, String> properties; private List<String> inputTopics; private List<Consumer<T>> inputConsumers; private final TopicSchema topicSchema; - private final String fqfn; - public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, - String fqfn) { + public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) { this.pulsarClient = pulsarClient; this.pulsarSourceConfig = pulsarConfig; this.topicSchema = new TopicSchema(pulsarClient); - this.fqfn = fqfn; + this.properties = properties; } @Override @@ -64,10 +65,6 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> log.info("Opening pulsar source with config: {}", pulsarSourceConfig); Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs(); - Map<String, String> properties = new HashMap<>(); - properties.put("application", "pulsarfunction"); - properties.put("fqfn", fqfn); - inputConsumers = configs.entrySet().stream().map(e -> { String topic = e.getKey(); ConsumerConfig<T> conf = e.getValue(); diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 63332d0..6b56163 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -147,7 +147,12 @@ class ContextImpl(pulsar.Context): batching_enabled=True, batching_max_publish_delay_ms=1, max_pending_messages=100000, - compression_type=pulsar_compression_type + compression_type=pulsar_compression_type, + properties=util.get_properties(util.getFullyQualifiedFunctionName( + self.instance_config.function_details.tenant, + self.instance_config.function_details.namespace, + self.instance_config.function_details.name), + self.instance_config.instance_id) ) if serde_class_name not in self.publish_serializers: diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index d4c3da5..cfd9eae 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -122,6 +122,13 @@ class PythonInstance(object): subscription_name = str(self.instance_config.function_details.tenant) + "/" + \ str(self.instance_config.function_details.namespace) + "/" + \ str(self.instance_config.function_details.name) + + properties = util.get_properties(util.getFullyQualifiedFunctionName( + self.instance_config.function_details.tenant, + self.instance_config.function_details.namespace, + self.instance_config.function_details.name), + self.instance_config.instance_id) + for topic, serde in self.instance_config.function_details.source.topicsToSerDeClassName.items(): if not serde: serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER) @@ -133,7 +140,8 @@ class PythonInstance(object): str(topic), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None, + properties=properties ) for topic, consumer_conf in self.instance_config.function_details.source.inputSpecs.items(): @@ -148,14 +156,16 @@ class PythonInstance(object): re.compile(str(topic)), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None, + properties=properties ) else: self.consumers[topic] = self.pulsar_client.subscribe( str(topic), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), - unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None + unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None, + properties=properties ) function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className) @@ -271,7 +281,13 @@ class PythonInstance(object): # set send timeout to be infinity to prevent potential deadlock with consumer # that might happen when consumer is blocked due to unacked messages send_timeout_millis=0, - max_pending_messages=100000) + max_pending_messages=100000, + properties=util.get_properties(util.getFullyQualifiedFunctionName( + self.instance_config.function_details.tenant, + self.instance_config.function_details.namespace, + self.instance_config.function_details.name), + self.instance_config.instance_id) + ) def message_listener(self, serde, consumer, message): # increment number of received records from source diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py index 76f75bd..0978f39 100644 --- a/pulsar-functions/instance/src/main/python/util.py +++ b/pulsar-functions/instance/src/main/python/util.py @@ -68,6 +68,12 @@ def import_class_from_path(from_path, full_class_name): def getFullyQualifiedFunctionName(tenant, namespace, name): return "%s/%s/%s" % (tenant, namespace, name) +def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id): + return "%s/%s/%s:%s" % (tenant, namespace, name, instance_id) + +def get_properties(fullyQualifiedName, instanceId): + return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)} + class FixedTimer(): def __init__(self, t, hFunction): diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index fb3d2c2..53cfeb6 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.SinkRecord; import org.apache.pulsar.functions.source.TopicSchema; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SinkContext; import org.mockito.ArgumentMatcher; import org.testng.Assert; @@ -100,6 +101,7 @@ public class PulsarSinkTest { doReturn(producerBuilder).when(producerBuilder).topic(anyString()); doReturn(producerBuilder).when(producerBuilder).producerName(anyString()); doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString()); + doReturn(producerBuilder).when(producerBuilder).properties(any()); doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any()); CompletableFuture completableFuture = new CompletableFuture<>(); @@ -158,7 +160,7 @@ public class PulsarSinkTest { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { Schema schema = pulsarSink.initializeSchema(); @@ -176,7 +178,7 @@ public class PulsarSinkTest { // set type to be inconsistent to that of SerDe pulsarConfig.setTypeClassName(Integer.class.getName()); pulsarConfig.setSerdeClassName(TestSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -198,7 +200,7 @@ public class PulsarSinkTest { PulsarSinkConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(String.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); @@ -217,7 +219,7 @@ public class PulsarSinkTest { // set type to void pulsarConfig.setTypeClassName(String.class.getName()); pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); @@ -233,7 +235,7 @@ public class PulsarSinkTest { // set type to void pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName()); pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName()); - PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSink.initializeSchema(); @@ -257,7 +259,7 @@ public class PulsarSinkTest { /** test At-least-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); - PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test"); + PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -314,7 +316,7 @@ public class PulsarSinkTest { /** test At-most-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test"); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); @@ -371,7 +373,7 @@ public class PulsarSinkTest { /** test Effectively-once **/ pulsarClient = getPulsarClient(); pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE); - pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test"); + pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>()); pulsarSink.open(new HashMap<>(), mock(SinkContext.class)); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 8e59e00..88c9637 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.SourceContext; import org.testng.annotations.Test; @@ -125,7 +126,7 @@ public class PulsarSourceTest { PulsarSourceConfig pulsarConfig = getPulsarConfigs(); // set type to void pulsarConfig.setTypeClassName(Void.class.getName()); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); @@ -151,7 +152,7 @@ public class PulsarSourceTest { topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build()); pulsarConfig.setTopicSchema(topicSerdeClassNameMap); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); try { pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); fail("Should fail constructing java instance if function type is inconsistent with serde type"); @@ -176,7 +177,7 @@ public class PulsarSourceTest { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @@ -192,7 +193,7 @@ public class PulsarSourceTest { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @@ -205,7 +206,7 @@ public class PulsarSourceTest { consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build()); pulsarConfig.setTopicSchema(consumerConfigs); - PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test"); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>()); pulsarSource.setupConsumerConfigs(); }