This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e9a1b9a additional refactoring to use source interface (#1681) e9a1b9a is described below commit e9a1b9a58b28687558b29705ea66047ef82fd05c Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Apr 30 15:21:50 2018 -0700 additional refactoring to use source interface (#1681) * additional refactoring to use source interface * removing PulsarConstants * remove unnecessary import * removing sink message for now * addressing comments * adding null check --- .../pulsar/broker/admin/impl/FunctionsBase.java | 3 +- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 148 +++++----- .../org/apache/pulsar/functions/api/Context.java | 3 +- .../pulsar/functions/instance/ContextImpl.java | 2 +- .../pulsar/functions/instance/JavaInstance.java | 3 +- .../functions/instance/JavaInstanceRunnable.java | 29 +- .../instance/processors/AtLeastOnceProcessor.java | 5 +- .../instance/processors/AtMostOnceProcessor.java | 5 +- .../processors/EffectivelyOnceProcessor.java | 12 +- .../instance/processors/MessageProcessor.java | 28 +- .../instance/processors/MessageProcessorBase.java | 50 ++-- .../{instance => source}/PulsarConfig.java | 26 +- .../{instance => source}/PulsarRecord.java | 2 +- .../{instance => source}/PulsarSource.java | 63 ++++- .../instance/src/main/python/Function_pb2.py | 305 ++++++++++----------- .../instance/src/main/python/python_instance.py | 26 +- .../src/main/python/python_instance_main.py | 35 ++- .../instance/JavaInstanceRunnableTest.java | 45 --- .../functions/instance/JavaInstanceTest.java | 4 +- .../pulsar/functions/source/PulsarSourceTest.java | 136 +++++++++ .../proto/src/main/proto/Function.proto | 52 ++-- .../functions/proto/FunctionDetailsTest.java | 2 +- .../pulsar/functions/runtime/JavaInstanceMain.java | 56 ++-- .../pulsar/functions/runtime/ProcessRuntime.java | 55 +--- .../functions/runtime/ProcessRuntimeTest.java | 31 ++- .../pulsar/functions/utils/FunctionConfig.java | 12 +- .../functions/worker/rest/api/FunctionsImpl.java | 25 +- .../worker/rest/api/v2/FunctionApiV2Resource.java | 3 +- .../rest/api/v2/FunctionApiV2ResourceTest.java | 278 ++++++++----------- 29 files changed, 731 insertions(+), 713 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index e5c050e..80e98af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -237,11 +237,12 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi public Response triggerFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("topic") String topic, final @FormDataParam("data") String triggerValue, final @FormDataParam("dataStream") InputStream triggerStream) { return functions.triggerFunction( - tenant, namespace, functionName, triggerValue, triggerStream); + tenant, namespace, functionName, topic, triggerValue, triggerStream); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 92bd757..e2f0a71 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -23,12 +23,15 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.isNull; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.ObjectOutputStream; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -49,7 +52,7 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.instance.PulsarSource; +import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; @@ -59,8 +62,11 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner; import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf; import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil; import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled; -import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails; +import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec; import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType; +import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees; +import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; @@ -323,7 +329,7 @@ public class CmdFunctions extends CmdBase { } private void doJavaSubmitChecks(FunctionConfig functionConfig) { - if (isNull(className)) { + if (isNull(functionConfig.getClassName())) { throw new IllegalArgumentException("You supplied a jar file but no main class"); } @@ -527,6 +533,68 @@ public class CmdFunctions extends CmdBase { return functionConfig.getCustomSerdeInputs().keySet().iterator().next(); } } + + protected FunctionDetails convert(FunctionConfig functionConfig) + throws IOException { + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + + // Setup source + Map<String, String> topicToSerDeClassNameMap = new HashMap<>(); + topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs()); + SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); + if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) { + sourceSpecBuilder.setClassName(PulsarSource.class.getName()); + } + functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, "")); + sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap); + if (functionConfig.getSubscriptionType() != null) { + sourceSpecBuilder + .setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType())); + } + functionDetailsBuilder.setSource(sourceSpecBuilder); + + if (functionConfig.getTenant() != null) { + functionDetailsBuilder.setTenant(functionConfig.getTenant()); + } + if (functionConfig.getNamespace() != null) { + functionDetailsBuilder.setNamespace(functionConfig.getNamespace()); + } + if (functionConfig.getName() != null) { + functionDetailsBuilder.setName(functionConfig.getName()); + } + if (functionConfig.getClassName() != null) { + functionDetailsBuilder.setClassName(functionConfig.getClassName()); + } + if (functionConfig.getOutput() != null) { + functionDetailsBuilder.setOutput(functionConfig.getOutput()); + } + if (functionConfig.getOutputSerdeClassName() != null) { + functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName()); + } + if (functionConfig.getLogTopic() != null) { + functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic()); + } + if (functionConfig.getRuntime() != null) { + functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime())); + } + if (!functionConfig.getUserConfig().isEmpty()) { + functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig()); + } + if (functionConfig.getProcessingGuarantees() != null) { + functionDetailsBuilder.setProcessingGuarantees( + convertProcessingGuarantee(functionConfig.getProcessingGuarantees())); + } + functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); + functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); + return functionDetailsBuilder.build(); + } + + protected org.apache.pulsar.functions.proto.Function.FunctionDetails convertProto2(FunctionConfig functionConfig) + throws IOException { + org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); + Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), functionDetailsBuilder); + return functionDetailsBuilder.build(); + } } @Parameters(commandDescription = "Run the Pulsar Function locally (rather than deploying it to the Pulsar cluster)") @@ -871,64 +939,19 @@ public class CmdFunctions extends CmdBase { throw new IllegalArgumentException("You must specify one or more input topics for the function"); } } - - private org.apache.pulsar.functions.proto.Function.FunctionDetails convertProto2(FunctionConfig functionConfig) - throws IOException { - org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder(); - Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), functionDetailsBuilder); - return functionDetailsBuilder.build(); - } - private FunctionDetails convert(FunctionConfig functionConfig) - throws IOException { - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - if (functionConfig.getInputs() != null) { - functionDetailsBuilder.setTenant(functionConfig.getTenant()); - } - functionDetailsBuilder.setSource( - ConnectorDetails.newBuilder() - .setClassName(PulsarSource.class.getName()) - .build()); - if (functionConfig.getNamespace() != null) { - functionDetailsBuilder.setNamespace(functionConfig.getNamespace()); - } - if (functionConfig.getName() != null) { - functionDetailsBuilder.setName(functionConfig.getName()); - } - if (functionConfig.getClassName() != null) { - functionDetailsBuilder.setClassName(functionConfig.getClassName()); - } - functionDetailsBuilder.putAllCustomSerdeInputs(functionConfig.getCustomSerdeInputs()); - if (functionConfig.getOutputSerdeClassName() != null) { - functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName()); - } - if (functionConfig.getOutput() != null) { - functionDetailsBuilder.setOutput(functionConfig.getOutput()); - } - if (functionConfig.getLogTopic() != null) { - functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic()); - } - if (functionConfig.getProcessingGuarantees() != null) { - functionDetailsBuilder.setProcessingGuarantees( - convertProcessingGuarantee(functionConfig.getProcessingGuarantees())); - } - functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig()); - if (functionConfig.getSubscriptionType() != null) { - functionDetailsBuilder.setSubscriptionType( - convertSubscriptionType(functionConfig.getSubscriptionType())); - } - if (functionConfig.getRuntime() != null) { - functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime())); + private static FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) { + for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) { + if (type.name().equals(runtime.name())) { + return type; + } } - functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck()); - functionDetailsBuilder.addAllInputs(functionConfig.getInputs()); - functionDetailsBuilder.setParallelism(functionConfig.getParallelism()); - return functionDetailsBuilder.build(); + throw new RuntimeException("Unrecognized runtime: " + runtime.name()); } - private static FunctionDetails.SubscriptionType convertSubscriptionType( + private static SubscriptionType convertSubscriptionType( FunctionConfig.SubscriptionType subscriptionType) { - for (FunctionDetails.SubscriptionType type : FunctionDetails.SubscriptionType.values()) { + for (SubscriptionType type : SubscriptionType.values()) { if (type.name().equals(subscriptionType.name())) { return type; } @@ -936,9 +959,9 @@ public class CmdFunctions extends CmdBase { throw new RuntimeException("Unrecognized subscription type: " + subscriptionType.name()); } - private static FunctionDetails.ProcessingGuarantees convertProcessingGuarantee( + private static ProcessingGuarantees convertProcessingGuarantee( FunctionConfig.ProcessingGuarantees processingGuarantees) { - for (FunctionDetails.ProcessingGuarantees type : FunctionDetails.ProcessingGuarantees.values()) { + for (ProcessingGuarantees type : ProcessingGuarantees.values()) { if (type.name().equals(processingGuarantees.name())) { return type; } @@ -946,15 +969,6 @@ public class CmdFunctions extends CmdBase { throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name()); } - private static FunctionDetails.Runtime convertRuntime(FunctionConfig.Runtime runtime) { - for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) { - if (type.name().equals(runtime.name())) { - return type; - } - } - throw new RuntimeException("Unrecognized runtime: " + runtime.name()); - } - private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { String[] args = fqfn.split("/"); if (args.length != 3) { diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 653d176..01d8291 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -162,8 +162,7 @@ public interface Context { * By default acknowledgement management is done transparently by Pulsar Functions framework. * However users can disable that and do ack management by themselves by using this API. * @param messageId The messageId that needs to be acknowledged - * @param topic The topic name that the message belongs to that needs to be acknowledged * @return A future that completes when the framework is done acking the message */ - CompletableFuture<Void> ack(byte[] messageId, String topic); + CompletableFuture<Void> ack(byte[] messageId); } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index 7a01d25..16a04f4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -249,7 +249,7 @@ class ContextImpl implements Context { //TODO remove topic argument @Override - public CompletableFuture<Void> ack(byte[] messageId, String topic) { + public CompletableFuture<Void> ack(byte[] messageId) { MessageId actualMessageId = null; try { actualMessageId = MessageId.fromByteArray(messageId); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 67f69bf..b78f5fc 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -27,8 +27,7 @@ import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; -import java.util.Map; - +import org.apache.pulsar.functions.source.PulsarSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index d3853e2..e181204 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -58,6 +58,7 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.processors.MessageProcessor; import org.apache.pulsar.functions.proto.InstanceCommunication; +import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.instance.state.StateContextImpl; @@ -93,7 +94,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Exception deathException; @Getter(AccessLevel.PACKAGE) - private Map<String, SerDe> inputSerDe; private SerDe outputSerDe; @Getter(AccessLevel.PACKAGE) @@ -159,7 +159,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // start the output producer processor.setupOutput(outputSerDe); // start the input consumer - processor.setupInput(inputSerDe); + processor.setupInput(typeArgs[0]); // start any log topic handler setupLogHandler(); @@ -414,31 +414,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) { - - this.inputSerDe = new HashMap<>(); - instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) -> this.inputSerDe.put(k, InstanceUtils.initializeSerDe(v, clsLoader, typeArgs[0]))); - for (String topicName : instanceConfig.getFunctionDetails().getInputsList()) { - this.inputSerDe.put(topicName, InstanceUtils.initializeDefaultSerDe(typeArgs[0])); - } - - if (Void.class.equals(typeArgs[0])) { - throw new RuntimeException("Input type of Pulsar Function cannot be Void"); - } - - for (SerDe serDe : inputSerDe.values()) { - if (serDe.getClass().getName().equals(DefaultSerDe.class.getName())) { - if (!DefaultSerDe.IsSupportedType(typeArgs[0])) { - throw new RuntimeException("Default Serde does not support " + typeArgs[0]); - } - } else { - Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - if (!typeArgs[0].isAssignableFrom(inputSerdeTypeArgs[0])) { - throw new RuntimeException("Inconsistent types found between function input type and input serde type: " - + " function type = " + typeArgs[0] + " should be assignable from " + inputSerdeTypeArgs[0]); - } - } - } - if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class` if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() == null || instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty() diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java index 465d198..d51ae02 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java @@ -40,9 +40,8 @@ public class AtLeastOnceProcessor extends MessageProcessorBase { private Producer<byte[]> producer; AtLeastOnceProcessor(PulsarClient client, - FunctionDetails functionDetails, - SubscriptionType subType) { - super(client, functionDetails, subType); + FunctionDetails functionDetails) { + super(client, functionDetails); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java index 08cf111..30b1630 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java @@ -38,9 +38,8 @@ class AtMostOnceProcessor extends MessageProcessorBase { private Producer<byte[]> producer; AtMostOnceProcessor(PulsarClient client, - FunctionDetails functionDetails, - SubscriptionType subType) { - super(client, functionDetails, subType); + FunctionDetails functionDetails) { + super(client, functionDetails); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java index aafca69..06a463b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java @@ -30,7 +30,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.connect.core.Record; -import org.apache.pulsar.functions.instance.PulsarRecord; +import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers; import org.apache.pulsar.functions.instance.producers.Producers; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -46,15 +46,7 @@ class EffectivelyOnceProcessor extends MessageProcessorBase implements ConsumerE EffectivelyOnceProcessor(PulsarClient client, FunctionDetails functionDetails) { - super(client, functionDetails, SubscriptionType.Failover); - } - - /** - * An effectively-once processor can only use `Failover` subscription. - */ - @Override - protected SubscriptionType getSubscriptionType() { - return SubscriptionType.Failover; + super(client, functionDetails); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java index fd22adb..0dcf12c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java @@ -24,12 +24,11 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees; +import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; /** * A processor that processes messages, used by {@link org.apache.pulsar.functions.instance.JavaInstanceRunnable}. @@ -39,16 +38,7 @@ public interface MessageProcessor extends AutoCloseable { static MessageProcessor create(PulsarClient client, FunctionDetails functionDetails) { - FunctionDetails.SubscriptionType fnSubType = functionDetails.getSubscriptionType(); ProcessingGuarantees processingGuarantees = functionDetails.getProcessingGuarantees(); - SubscriptionType subType; - if (FunctionDetails.SubscriptionType.SHARED == fnSubType) { - subType = SubscriptionType.Shared; - } else if (FunctionDetails.SubscriptionType.EXCLUSIVE == fnSubType) { - subType = SubscriptionType.Exclusive; - } else { - subType = SubscriptionType.Failover; - } if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { return new EffectivelyOnceProcessor( @@ -57,25 +47,23 @@ public interface MessageProcessor extends AutoCloseable { } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) { return new AtMostOnceProcessor( client, - functionDetails, - subType); + functionDetails); } else { return new AtLeastOnceProcessor( client, - functionDetails, - subType); + functionDetails); } } void postReceiveMessage(Record record); /** - * Setup the input with a provided <i>processQueue</i>. The implementation of this processor is responsible for - * setting up the input and passing the received messages from input to the provided <i>processQueue</i>. - * - * @param inputSerDe SerDe to deserialize messages from input. + * Setup the source. Implementation is responsible for initializing the source + * and for calling open method for source + * @param inputType the input type of the function + * @throws Exception */ - void setupInput(Map<String, SerDe> inputSerDe) + void setupInput(Class<?> inputType) throws Exception; /** diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java index 06bc175..a2a5d8b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java @@ -18,10 +18,9 @@ */ package org.apache.pulsar.functions.instance.processors; -import java.util.LinkedList; -import java.util.List; import java.util.Map; +import com.google.gson.Gson; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -31,9 +30,10 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.connect.core.Source; import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.instance.PulsarConfig; -import org.apache.pulsar.functions.instance.PulsarSource; +import org.apache.pulsar.functions.source.PulsarConfig; +import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; @@ -45,22 +45,15 @@ abstract class MessageProcessorBase implements MessageProcessor { protected final PulsarClient client; protected final FunctionDetails functionDetails; - protected final SubscriptionType subType; @Getter protected Source source; - protected List<String> topics; protected MessageProcessorBase(PulsarClient client, - FunctionDetails functionDetails, - SubscriptionType subType) { + FunctionDetails functionDetails) { this.client = client; this.functionDetails = functionDetails; - this.subType = subType; - this.topics = new LinkedList<>(); - this.topics.addAll(this.functionDetails.getInputsList()); - this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet()); } // @@ -68,26 +61,31 @@ abstract class MessageProcessorBase implements MessageProcessor { // @Override - public void setupInput(Map<String, SerDe> inputSerDe) throws Exception { + public void setupInput(Class<?> inputType) throws Exception { - org.apache.pulsar.functions.proto.Function.ConnectorDetails connectorDetails = this.functionDetails.getSource(); + org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource(); Object object; - if (connectorDetails.getClassName().equals(PulsarSource.class.getName())) { - PulsarConfig pulsarConfig = PulsarConfig.builder() - .topicToSerdeMap(inputSerDe) - .subscription(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails)) - .processingGuarantees(this.functionDetails.getProcessingGuarantees()) - .subscriptionType(this.subType) - .build(); + if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) { + + PulsarConfig pulsarConfig = new PulsarConfig(); + pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap()); + pulsarConfig.setSubscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails)); + pulsarConfig.setProcessingGuarantees( + FunctionConfig.ProcessingGuarantees.valueOf(this.functionDetails.getProcessingGuarantees().name())); + pulsarConfig.setSubscriptionType( + FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name())); + pulsarConfig.setTypeClassName(inputType.getName()); + Object[] params = {this.client, pulsarConfig}; Class[] paramTypes = {PulsarClient.class, PulsarConfig.class}; + object = Reflections.createInstance( - connectorDetails.getClassName(), + sourceSpec.getClassName(), PulsarSource.class.getClassLoader(), params, paramTypes); } else { object = Reflections.createInstance( - connectorDetails.getClassName(), + sourceSpec.getClassName(), Thread.currentThread().getContextClassLoader()); } @@ -101,7 +99,7 @@ abstract class MessageProcessorBase implements MessageProcessor { this.source = (Source) object; try { - this.source.open(connectorDetails.getConfigsMap()); + this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), Map.class)); } catch (Exception e) { log.info("Error occurred executing open for source: {}", this.functionDetails.getSource().getClassName(), e); @@ -109,10 +107,6 @@ abstract class MessageProcessorBase implements MessageProcessor { } - protected SubscriptionType getSubscriptionType() { - return subType; - } - public Record recieveMessage() throws Exception { return this.source.read(); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java similarity index 58% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java index c812a77..2a5dc44 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java @@ -16,27 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.instance; +package org.apache.pulsar.functions.source; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Builder; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.functions.api.SerDe; -import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.utils.FunctionConfig; +import java.io.IOException; import java.util.Map; @Getter @Setter -@Data -@Builder @ToString public class PulsarConfig { - private Function.FunctionDetails.ProcessingGuarantees processingGuarantees; - private SubscriptionType subscriptionType; - private String subscription; - private Map<String, SerDe> topicToSerdeMap; + + private FunctionConfig.ProcessingGuarantees processingGuarantees; + private FunctionConfig.SubscriptionType subscriptionType; + private String subscriptionName; + private Map<String, String> topicSerdeClassNameMap; + private String typeClassName; + + public static PulsarConfig load(Map<String, Object> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarConfig.class); + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java similarity index 97% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index 2bdbdb1..7211db1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.instance; +package org.apache.pulsar.functions.source; import lombok.Builder; import lombok.Data; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java similarity index 62% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 43d9350..caaa7bf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -16,19 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.instance; +package org.apache.pulsar.functions.source; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import net.jodah.typetools.TypeResolver; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.connect.core.Source; -import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.instance.InstanceUtils; +import org.apache.pulsar.functions.utils.FunctionConfig; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -37,6 +42,7 @@ public class PulsarSource<T> implements Source<T> { private PulsarClient pulsarClient; private PulsarConfig pulsarConfig; + private Map<String, SerDe> topicToSerDeMap = new HashMap<>(); @Getter private org.apache.pulsar.client.api.Consumer inputConsumer; @@ -48,10 +54,14 @@ public class PulsarSource<T> implements Source<T> { @Override public void open(Map<String, Object> config) throws Exception { + // Setup Serialization/Deserialization + setupSerde(); + + // Setup pulsar consumer this.inputConsumer = this.pulsarClient.newConsumer() - .topics(new ArrayList<>(this.pulsarConfig.getTopicToSerdeMap().keySet())) - .subscriptionName(this.pulsarConfig.getSubscription()) - .subscriptionType(this.pulsarConfig.getSubscriptionType()) + .topics(new ArrayList<>(this.pulsarConfig.getTopicSerdeClassNameMap().keySet())) + .subscriptionName(this.pulsarConfig.getSubscriptionName()) + .subscriptionType(this.pulsarConfig.getSubscriptionType().get()) .ackTimeout(1, TimeUnit.MINUTES) .subscribe(); } @@ -71,13 +81,13 @@ public class PulsarSource<T> implements Source<T> { MessageIdImpl messageId = (MessageIdImpl) topicMessageId.getInnerMessageId(); partitionId = Long.toString(messageId.getPartitionIndex()); } else { - topicName = this.pulsarConfig.getTopicToSerdeMap().keySet().iterator().next(); + topicName = this.pulsarConfig.getTopicSerdeClassNameMap().keySet().iterator().next(); partitionId = Long.toString(((MessageIdImpl) message.getMessageId()).getPartitionIndex()); } Object object; try { - object = this.pulsarConfig.getTopicToSerdeMap().get(topicName).deserialize(message.getData()); + object = this.topicToSerDeMap.get(topicName).deserialize(message.getData()); } catch (Exception e) { //TODO Add deserialization exception stats throw new RuntimeException("Error occured when attempting to deserialize input:", e); @@ -97,15 +107,13 @@ public class PulsarSource<T> implements Source<T> { .sequenceId(message.getSequenceId()) .topicName(topicName) .ackFunction(() -> { - if (pulsarConfig.getProcessingGuarantees() - == Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) { + if (pulsarConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { inputConsumer.acknowledgeCumulativeAsync(message); } else { inputConsumer.acknowledgeAsync(message); } }).failFunction(() -> { - if (pulsarConfig.getProcessingGuarantees() - == Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) { + if (pulsarConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { throw new RuntimeException("Failed to process message: " + message.getMessageId()); } }) @@ -117,4 +125,37 @@ public class PulsarSource<T> implements Source<T> { public void close() throws Exception { this.inputConsumer.close(); } + + private void setupSerde() throws ClassNotFoundException { + + Class<?> typeArg = Class.forName(this.pulsarConfig.getTypeClassName()); + if (Void.class.equals(typeArg)) { + throw new RuntimeException("Input type of Pulsar Function cannot be Void"); + } + + for (Map.Entry<String, String> entry : this.pulsarConfig.getTopicSerdeClassNameMap().entrySet()) { + String topic = entry.getKey(); + String serDeClassname = entry.getValue(); + if (serDeClassname.isEmpty()) { + serDeClassname = DefaultSerDe.class.getName(); + } + SerDe serDe = InstanceUtils.initializeSerDe(serDeClassname, + Thread.currentThread().getContextClassLoader(), typeArg); + this.topicToSerDeMap.put(topic, serDe); + } + + for (SerDe serDe : this.topicToSerDeMap.values()) { + if (serDe.getClass().getName().equals(DefaultSerDe.class.getName())) { + if (!DefaultSerDe.IsSupportedType(typeArg)) { + throw new RuntimeException("Default Serde does not support " + typeArg); + } + } else { + Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); + if (!typeArg.isAssignableFrom(inputSerdeTypeArgs[0])) { + throw new RuntimeException("Inconsistent types found between function input type and input serde type: " + + " function type = " + typeArg + " should be assignable from " + inputSerdeTypeArgs[0]); + } + } + } + } } diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py index 0f65991..b988383 100644 --- a/pulsar-functions/instance/src/main/python/Function_pb2.py +++ b/pulsar-functions/instance/src/main/python/Function_pb2.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -18,13 +17,12 @@ # under the License. # -# -*- encoding: utf-8 -*- - # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Function.proto import sys _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf.internal import enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection @@ -41,14 +39,12 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='Function.proto', package='proto', syntax='proto3', - serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xd5\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12H\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32-.proto.FunctionDetails.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 \x01(\t\x12I\n\x14processingGuarantees\x18 [...] + serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x1c\n\x14outputSerdeClassName\x18\x05 \x01(\t\x12\x0e\n\x06output\x18\x06 \x01(\t\x12\x10\n\x08logTopic\x18\x07 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x08 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\t \x03(\x0b\x32&.proto. [...] ) - - -_FUNCTIONDETAILS_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( +_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( name='ProcessingGuarantees', - full_name='proto.FunctionDetails.ProcessingGuarantees', + full_name='proto.ProcessingGuarantees', filename=None, file=DESCRIPTOR, values=[ @@ -67,14 +63,15 @@ _FUNCTIONDETAILS_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=706, - serialized_end=785, + serialized_start=1187, + serialized_end=1266, ) -_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_PROCESSINGGUARANTEES) +_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES) -_FUNCTIONDETAILS_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( +ProcessingGuarantees = enum_type_wrapper.EnumTypeWrapper(_PROCESSINGGUARANTEES) +_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( name='SubscriptionType', - full_name='proto.FunctionDetails.SubscriptionType', + full_name='proto.SubscriptionType', filename=None, file=DESCRIPTOR, values=[ @@ -83,20 +80,24 @@ _FUNCTIONDETAILS_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor( options=None, type=None), _descriptor.EnumValueDescriptor( - name='EXCLUSIVE', index=1, number=1, - options=None, - type=None), - _descriptor.EnumValueDescriptor( - name='FAILOVER', index=2, number=2, + name='FAILOVER', index=1, number=1, options=None, type=None), ], containing_type=None, options=None, - serialized_start=787, - serialized_end=846, + serialized_start=1268, + serialized_end=1312, ) -_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_SUBSCRIPTIONTYPE) +_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE) + +SubscriptionType = enum_type_wrapper.EnumTypeWrapper(_SUBSCRIPTIONTYPE) +ATLEAST_ONCE = 0 +ATMOST_ONCE = 1 +EFFECTIVELY_ONCE = 2 +SHARED = 0 +FAILOVER = 1 + _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor( name='Runtime', @@ -115,49 +116,12 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=848, - serialized_end=879, + serialized_start=500, + serialized_end=531, ) _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME) -_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY = _descriptor.Descriptor( - name='CustomSerdeInputsEntry', - full_name='proto.FunctionDetails.CustomSerdeInputsEntry', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='key', full_name='proto.FunctionDetails.CustomSerdeInputsEntry.key', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='value', full_name='proto.FunctionDetails.CustomSerdeInputsEntry.value', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=597, - serialized_end=653, -) - _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor( name='UserConfigEntry', full_name='proto.FunctionDetails.UserConfigEntry', @@ -191,8 +155,8 @@ _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=655, - serialized_end=704, + serialized_start=449, + serialized_end=498, ) _FUNCTIONDETAILS = _descriptor.Descriptor( @@ -231,85 +195,71 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='customSerdeInputs', full_name='proto.FunctionDetails.customSerdeInputs', index=4, - number=5, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], + name='outputSerdeClassName', full_name='proto.FunctionDetails.outputSerdeClassName', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='outputSerdeClassName', full_name='proto.FunctionDetails.outputSerdeClassName', index=5, + name='output', full_name='proto.FunctionDetails.output', index=5, number=6, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='output', full_name='proto.FunctionDetails.output', index=6, + name='logTopic', full_name='proto.FunctionDetails.logTopic', index=6, number=7, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='logTopic', full_name='proto.FunctionDetails.logTopic', index=7, - number=8, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=8, - number=9, type=14, cpp_type=8, label=1, + name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=7, + number=8, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='userConfig', full_name='proto.FunctionDetails.userConfig', index=9, - number=10, type=11, cpp_type=10, label=3, + name='userConfig', full_name='proto.FunctionDetails.userConfig', index=8, + number=9, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='subscriptionType', full_name='proto.FunctionDetails.subscriptionType', index=10, - number=11, type=14, cpp_type=8, label=1, + name='runtime', full_name='proto.FunctionDetails.runtime', index=9, + number=10, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='runtime', full_name='proto.FunctionDetails.runtime', index=11, - number=12, type=14, cpp_type=8, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='autoAck', full_name='proto.FunctionDetails.autoAck', index=12, - number=13, type=8, cpp_type=7, label=1, + name='autoAck', full_name='proto.FunctionDetails.autoAck', index=10, + number=11, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='inputs', full_name='proto.FunctionDetails.inputs', index=13, - number=14, type=9, cpp_type=9, label=3, - has_default_value=False, default_value=[], + name='parallelism', full_name='proto.FunctionDetails.parallelism', index=11, + number=12, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='parallelism', full_name='proto.FunctionDetails.parallelism', index=14, - number=15, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, + name='source', full_name='proto.FunctionDetails.source', index=12, + number=13, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='source', full_name='proto.FunctionDetails.source', index=15, - number=16, type=11, cpp_type=10, label=1, + name='sink', full_name='proto.FunctionDetails.sink', index=13, + number=14, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -317,10 +267,8 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( ], extensions=[ ], - nested_types=[_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY, _FUNCTIONDETAILS_USERCONFIGENTRY, ], + nested_types=[_FUNCTIONDETAILS_USERCONFIGENTRY, ], enum_types=[ - _FUNCTIONDETAILS_PROCESSINGGUARANTEES, - _FUNCTIONDETAILS_SUBSCRIPTIONTYPE, _FUNCTIONDETAILS_RUNTIME, ], options=None, @@ -330,26 +278,26 @@ _FUNCTIONDETAILS = _descriptor.Descriptor( oneofs=[ ], serialized_start=26, - serialized_end=879, + serialized_end=531, ) -_CONNECTORDETAILS_CONFIGSENTRY = _descriptor.Descriptor( - name='ConfigsEntry', - full_name='proto.ConnectorDetails.ConfigsEntry', +_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor( + name='TopicsToSerDeClassNameEntry', + full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='key', full_name='proto.ConnectorDetails.ConfigsEntry.key', index=0, + name='key', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.key', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='value', full_name='proto.ConnectorDetails.ConfigsEntry.value', index=1, + name='value', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -367,26 +315,40 @@ _CONNECTORDETAILS_CONFIGSENTRY = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=976, - serialized_end=1022, + serialized_start=714, + serialized_end=775, ) -_CONNECTORDETAILS = _descriptor.Descriptor( - name='ConnectorDetails', - full_name='proto.ConnectorDetails', +_SOURCESPEC = _descriptor.Descriptor( + name='SourceSpec', + full_name='proto.SourceSpec', filename=None, file=DESCRIPTOR, containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='className', full_name='proto.ConnectorDetails.className', index=0, + name='className', full_name='proto.SourceSpec.className', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='configs', full_name='proto.ConnectorDetails.configs', index=1, + name='configs', full_name='proto.SourceSpec.configs', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=2, + number=3, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, @@ -395,7 +357,7 @@ _CONNECTORDETAILS = _descriptor.Descriptor( ], extensions=[ ], - nested_types=[_CONNECTORDETAILS_CONFIGSENTRY, ], + nested_types=[_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY, ], enum_types=[ ], options=None, @@ -404,8 +366,46 @@ _CONNECTORDETAILS = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=882, - serialized_end=1022, + serialized_start=534, + serialized_end=775, +) + + +_SINKSPEC = _descriptor.Descriptor( + name='SinkSpec', + full_name='proto.SinkSpec', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='className', full_name='proto.SinkSpec.className', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='configs', full_name='proto.SinkSpec.configs', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=777, + serialized_end=823, ) @@ -435,8 +435,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1024, - serialized_end=1070, + serialized_start=825, + serialized_end=871, ) @@ -487,8 +487,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1073, - serialized_end=1234, + serialized_start=874, + serialized_end=1035, ) @@ -525,8 +525,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1236, - serialized_end=1317, + serialized_start=1037, + serialized_end=1118, ) @@ -563,44 +563,37 @@ _ASSIGNMENT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1319, - serialized_end=1384, + serialized_start=1120, + serialized_end=1185, ) -_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONDETAILS _FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS -_FUNCTIONDETAILS.fields_by_name['customSerdeInputs'].message_type = _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY -_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _FUNCTIONDETAILS_PROCESSINGGUARANTEES +_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES _FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = _FUNCTIONDETAILS_USERCONFIGENTRY -_FUNCTIONDETAILS.fields_by_name['subscriptionType'].enum_type = _FUNCTIONDETAILS_SUBSCRIPTIONTYPE _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME -_FUNCTIONDETAILS.fields_by_name['source'].message_type = _CONNECTORDETAILS -_FUNCTIONDETAILS_PROCESSINGGUARANTEES.containing_type = _FUNCTIONDETAILS -_FUNCTIONDETAILS_SUBSCRIPTIONTYPE.containing_type = _FUNCTIONDETAILS +_FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC +_FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS -_CONNECTORDETAILS_CONFIGSENTRY.containing_type = _CONNECTORDETAILS -_CONNECTORDETAILS.fields_by_name['configs'].message_type = _CONNECTORDETAILS_CONFIGSENTRY +_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC +_SOURCESPEC.fields_by_name['subscriptionType'].enum_type = _SUBSCRIPTIONTYPE +_SOURCESPEC.fields_by_name['topicsToSerDeClassName'].message_type = _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = _FUNCTIONDETAILS _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS -DESCRIPTOR.message_types_by_name['ConnectorDetails'] = _CONNECTORDETAILS +DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC +DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT +DESCRIPTOR.enum_types_by_name['ProcessingGuarantees'] = _PROCESSINGGUARANTEES +DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE _sym_db.RegisterFileDescriptor(DESCRIPTOR) FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict( - CustomSerdeInputsEntry = _reflection.GeneratedProtocolMessageType('CustomSerdeInputsEntry', (_message.Message,), dict( - DESCRIPTOR = _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY, - __module__ = 'Function_pb2' - # @@protoc_insertion_point(class_scope:proto.FunctionDetails.CustomSerdeInputsEntry) - )) - , - UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict( DESCRIPTOR = _FUNCTIONDETAILS_USERCONFIGENTRY, __module__ = 'Function_pb2' @@ -612,23 +605,29 @@ FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_ # @@protoc_insertion_point(class_scope:proto.FunctionDetails) )) _sym_db.RegisterMessage(FunctionDetails) -_sym_db.RegisterMessage(FunctionDetails.CustomSerdeInputsEntry) _sym_db.RegisterMessage(FunctionDetails.UserConfigEntry) -ConnectorDetails = _reflection.GeneratedProtocolMessageType('ConnectorDetails', (_message.Message,), dict( +SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), dict( - ConfigsEntry = _reflection.GeneratedProtocolMessageType('ConfigsEntry', (_message.Message,), dict( - DESCRIPTOR = _CONNECTORDETAILS_CONFIGSENTRY, + TopicsToSerDeClassNameEntry = _reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry', (_message.Message,), dict( + DESCRIPTOR = _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY, __module__ = 'Function_pb2' - # @@protoc_insertion_point(class_scope:proto.ConnectorDetails.ConfigsEntry) + # @@protoc_insertion_point(class_scope:proto.SourceSpec.TopicsToSerDeClassNameEntry) )) , - DESCRIPTOR = _CONNECTORDETAILS, + DESCRIPTOR = _SOURCESPEC, + __module__ = 'Function_pb2' + # @@protoc_insertion_point(class_scope:proto.SourceSpec) + )) +_sym_db.RegisterMessage(SourceSpec) +_sym_db.RegisterMessage(SourceSpec.TopicsToSerDeClassNameEntry) + +SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec', (_message.Message,), dict( + DESCRIPTOR = _SINKSPEC, __module__ = 'Function_pb2' - # @@protoc_insertion_point(class_scope:proto.ConnectorDetails) + # @@protoc_insertion_point(class_scope:proto.SinkSpec) )) -_sym_db.RegisterMessage(ConnectorDetails) -_sym_db.RegisterMessage(ConnectorDetails.ConfigsEntry) +_sym_db.RegisterMessage(SinkSpec) PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), dict( DESCRIPTOR = _PACKAGELOCATIONMETADATA, @@ -661,10 +660,8 @@ _sym_db.RegisterMessage(Assignment) DESCRIPTOR.has_options = True DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function')) -_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.has_options = True -_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) _FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True _FUNCTIONDETAILS_USERCONFIGENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) -_CONNECTORDETAILS_CONFIGSENTRY.has_options = True -_CONNECTORDETAILS_CONFIGSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) +_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True +_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) # @@protoc_insertion_point(module_scope) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 53ca3f0..947ff23 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -110,8 +110,8 @@ class PythonInstance(object): self.function_purefunction = None self.producer = None self.exeuction_thread = None - self.atmost_once = self.instance_config.function_details.processingGuarantees == Function_pb2.FunctionDetails.ProcessingGuarantees.Value('ATMOST_ONCE') - self.atleast_once = self.instance_config.function_details.processingGuarantees == Function_pb2.FunctionDetails.ProcessingGuarantees.Value('ATLEAST_ONCE') + self.atmost_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATMOST_ONCE') + self.atleast_once = self.instance_config.function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE') self.auto_ack = self.instance_config.function_details.autoAck self.contextimpl = None self.total_stats = Stats() @@ -120,27 +120,17 @@ class PythonInstance(object): def run(self): # Setup consumers and input deserializers mode = pulsar._pulsar.ConsumerType.Shared - if self.instance_config.function_details.subscriptionType == Function_pb2.FunctionDetails.SubscriptionType.Value('EXCLUSIVE'): - mode = pulsar._pulsar.ConsumerType.Exclusive - elif self.instance_config.function_details.subscriptionType == Function_pb2.FunctionDetails.SubscriptionType.Value('FAILOVER'): + if self.instance_config.function_details.source.subscriptionType == Function_pb2.SubscriptionType.Value("FAILOVER"): mode = pulsar._pulsar.ConsumerType.Failover subscription_name = str(self.instance_config.function_details.tenant) + "/" + \ str(self.instance_config.function_details.namespace) + "/" + \ str(self.instance_config.function_details.name) - for topic, serde in self.instance_config.function_details.customSerdeInputs.items(): - serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) - self.input_serdes[topic] = serde_kclass() - Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) - self.consumers[topic] = self.pulsar_client.subscribe( - str(topic), subscription_name, - consumer_type=mode, - message_listener=partial(self.message_listener, topic, self.input_serdes[topic]) - ) - - for topic in self.instance_config.function_details.inputs: - global DEFAULT_SERIALIZER - serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER) + for topic, serde in self.instance_config.function_details.source.topicsToSerDeClassName.items(): + if not serde: + serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER) + else: + serde_kclass = util.import_class(os.path.dirname(self.user_code), serde) self.input_serdes[topic] = serde_kclass() Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) self.consumers[topic] = self.pulsar_client.subscribe( diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py index 8a29dc5..59ebce6 100644 --- a/pulsar-functions/instance/src/main/python/python_instance_main.py +++ b/pulsar-functions/instance/src/main/python/python_instance_main.py @@ -58,16 +58,14 @@ def main(): parser.add_argument('--name', required=True, help='Function Name') parser.add_argument('--tenant', required=True, help='Tenant Name') parser.add_argument('--namespace', required=True, help='Namespace name') - parser.add_argument('--custom_serde_input_topics', required=False, help='Input Topics Requiring Custom Deserialization') - parser.add_argument('--custom_serde_classnames', required=False, help='Input Serde Classnames') - parser.add_argument('--input_topics', required=False, help='Input topics with default serde') + parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe') parser.add_argument('--output_topic', required=False, help='Output Topic') parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames') parser.add_argument('--instance_id', required=True, help='Instance Id') parser.add_argument('--function_id', required=True, help='Function Id') parser.add_argument('--function_version', required=True, help='Function Version') parser.add_argument('--processing_guarantees', required=True, help='Processing Guarantees') - parser.add_argument('--subscription_type', required=True, help='Subscription Type') + parser.add_argument('--source_subscription_type', required=True, help='Subscription Type') parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url') parser.add_argument('--port', required=True, help='Instance Port', type=int) parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples') @@ -91,26 +89,25 @@ def main(): function_details.namespace = args.namespace function_details.name = args.name function_details.className = args.function_classname - if args.custom_serde_input_topics is None and args.input_topics is None: - Log.critical("Atleast one input topic must be present") + + sourceSpec = Function_pb2.SourceSpec() + sourceSpec.subscriptionType = Function_pb2.SubscriptionType.Value(args.source_subscription_type) + try: + source_topics_serde_classname_dict = json.loads(args.source_topics_serde_classname) + except ValueError: + log.critical("Cannot decode source_topics_serde_classname. This argument must be specifed as a JSON") sys.exit(1) - if args.custom_serde_input_topics is not None and args.custom_serde_classnames is not None: - input_topics = args.custom_serde_input_topics.split(",") - input_serde = args.custom_serde_classnames.split(",") - if len(input_topics) != len(input_serde): - Log.critical("CustomSerde InputTopcis and Serde classnames should match") - sys.exit(1) - for i in xrange(len(input_topics)): - function_details.customSerdeInputs[input_topics[i]] = input_serde[i] - if args.input_topics is not None: - for topic in args.input_topics.split(","): - function_details.inputs.append(topic) + if not source_topics_serde_classname_dict: + log.critical("source_topics_serde_classname cannot be empty") + for topics, serde_classname in source_topics_serde_classname_dict.items(): + sourceSpec.topicsToSerDeClassName[topics] = serde_classname + function_details.source.MergeFrom(sourceSpec) + if args.output_topic != None and len(args.output_topic) != 0: function_details.output = args.output_topic if args.output_serde_classname != None and len(args.output_serde_classname) != 0: function_details.outputSerdeClassName = args.output_serde_classname - function_details.processingGuarantees = Function_pb2.FunctionDetails.ProcessingGuarantees.Value(args.processing_guarantees) - function_details.subscriptionType = Function_pb2.FunctionDetails.SubscriptionType.Value(args.subscription_type) + function_details.processingGuarantees = Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees) if args.auto_ack == "true": function_details.autoAck = True else: diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index a1672ba..898d777 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -49,11 +49,6 @@ public class JavaInstanceRunnableTest { private static InstanceConfig createInstanceConfig(boolean addCustom, String outputSerde) { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - if (!addCustom) { - functionDetailsBuilder.addInputs("TEST"); - } else { - functionDetailsBuilder.putCustomSerdeInputs("TEST", IntegerSerDe.class.getName()); - } if (outputSerde != null) { functionDetailsBuilder.setOutputSerdeClassName(outputSerde); } @@ -117,26 +112,6 @@ public class JavaInstanceRunnableTest { } /** - * Verify that JavaInstance does not support functions that take Void type as input - */ - @Test - public void testVoidInputClasses() { - try { - JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName()); - Method method = makeAccessible(runnable); - VoidInputHandler pulsarFunction = new VoidInputHandler(); - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); - method.invoke(runnable, typeArgs, clsLoader); - assertFalse(true); - } catch (InvocationTargetException ex) { - // Good - } catch (Exception ex) { - assertFalse(true); - } - } - - /** * Verify that JavaInstance does support functions that output Void type */ @Test @@ -154,26 +129,6 @@ public class JavaInstanceRunnableTest { } /** - * Verify that function input type should be consistent with input serde type. - */ - @Test - public void testInconsistentInputType() { - try { - JavaInstanceRunnable runnable = createRunnable(true, DefaultSerDe.class.getName()); - Method method = makeAccessible(runnable); - ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - Function function = (Function<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); - method.invoke(runnable, typeArgs, clsLoader); - fail("Should fail constructing java instance if function type is inconsistent with serde type"); - } catch (InvocationTargetException ex) { - assertTrue(ex.getCause().getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); - } catch (Exception ex) { - assertTrue(false); - } - } - - /** * Verify that Default Serializer works fine. */ @Test diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index c0aae7c..e83a706 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -26,15 +26,13 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.source.PulsarSource; import org.testng.annotations.Test; -import java.util.HashMap; - public class JavaInstanceTest { private static InstanceConfig createInstanceConfig() { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - functionDetailsBuilder.addInputs("TEST"); functionDetailsBuilder.setOutputSerdeClassName(DefaultSerDe.class.getName()); InstanceConfig instanceConfig = new InstanceConfig(); instanceConfig.setFunctionDetails(functionDetailsBuilder.build()); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java new file mode 100644 index 0000000..558517a --- /dev/null +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.source; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.api.SerDe; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; +import org.apache.pulsar.functions.utils.FunctionConfig; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + +@Slf4j +public class PulsarSourceTest { + + private static final String SUBSCRIPTION_NAME = "test/test-namespace/example"; + private static Map<String, String> topicSerdeClassNameMap = new HashMap<>(); + static { + topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", DefaultSerDe.class.getName()); + } + + public static class TestSerDe implements SerDe<String> { + + @Override + public String deserialize(byte[] input) { + return null; + } + + @Override + public byte[] serialize(String input) { + return new byte[0]; + } + } + + /** + * Verify that JavaInstance does not support functions that take Void type as input + */ + + private static PulsarClient getPulsarClient() throws PulsarClientException { + PulsarClient pulsarClient = mock(PulsarClient.class); + ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); + doReturn(consumerBuilder).when(consumerBuilder).topics(anyList()); + doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString()); + doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any()); + doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any()); + Consumer consumer = mock(Consumer.class); + doReturn(consumer).when(consumerBuilder).subscribe(); + doReturn(consumerBuilder).when(pulsarClient).newConsumer(); + return pulsarClient; + } + + private static PulsarConfig getPulsarConfigs() { + PulsarConfig pulsarConfig = new PulsarConfig(); + pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); + pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER); + pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); + pulsarConfig.setTypeClassName(String.class.getName()); + return pulsarConfig; + } + + @Test + public void testVoidInputClasses() throws IOException { + PulsarConfig pulsarConfig = getPulsarConfigs(); + // set type to void + pulsarConfig.setTypeClassName(Void.class.getName()); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + + try { + pulsarSource.open(new HashMap<>()); + assertFalse(true); + } catch (RuntimeException ex) { + log.error("RuntimeException: {}", ex, ex); + assertEquals(ex.getMessage(), "Input type of Pulsar Function cannot be Void"); + } catch (Exception ex) { + log.error("Exception: {}", ex, ex); + assertFalse(true); + } + } + + /** + * Verify that function input type should be consistent with input serde type. + */ + @Test + public void testInconsistentInputType() throws IOException { + PulsarConfig pulsarConfig = getPulsarConfigs(); + // set type to be inconsistent to that of SerDe + pulsarConfig.setTypeClassName(Integer.class.getName()); + Map<String, String> topicSerdeClassNameMap = new HashMap<>(); + topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", TestSerDe.class.getName()); + pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); + PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); + try { + pulsarSource.open(new HashMap<>()); + fail("Should fail constructing java instance if function type is inconsistent with serde type"); + } catch (RuntimeException ex) { + log.error("RuntimeException: {}", ex, ex); + assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + } catch (Exception ex) { + log.error("Exception: {}", ex, ex); + assertTrue(false); + } + } +} diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto index 30dd9b5..063b6aa 100644 --- a/pulsar-functions/proto/src/main/proto/Function.proto +++ b/pulsar-functions/proto/src/main/proto/Function.proto @@ -23,17 +23,18 @@ package proto; option java_package = "org.apache.pulsar.functions.proto"; option java_outer_classname = "Function"; +enum ProcessingGuarantees { + ATLEAST_ONCE = 0; // [default value] + ATMOST_ONCE = 1; + EFFECTIVELY_ONCE = 2; +} + +enum SubscriptionType { + SHARED = 0; + FAILOVER = 1; +} + message FunctionDetails { - enum ProcessingGuarantees { - ATLEAST_ONCE = 0; // [default value] - ATMOST_ONCE = 1; - EFFECTIVELY_ONCE = 2; - } - enum SubscriptionType { - SHARED = 0; - EXCLUSIVE = 1; - FAILOVER = 2; - } enum Runtime { JAVA = 0; PYTHON = 1; @@ -42,24 +43,25 @@ message FunctionDetails { string namespace = 2; string name = 3; string className = 4; - // map from input topic name to serde - map<string, string> customSerdeInputs = 5; - string outputSerdeClassName = 6; - string output = 7; - string logTopic = 8; - ProcessingGuarantees processingGuarantees = 9; - map<string,string> userConfig = 10; - SubscriptionType subscriptionType = 11; - Runtime runtime = 12; - bool autoAck = 13; - repeated string inputs = 14; - int32 parallelism = 15; - ConnectorDetails source = 16; + string outputSerdeClassName = 5; + string output = 6; + string logTopic = 7; + ProcessingGuarantees processingGuarantees = 8; + map<string,string> userConfig = 9; + Runtime runtime = 10; + bool autoAck = 11; + int32 parallelism = 12; + SourceSpec source = 13; } -message ConnectorDetails { +message SourceSpec { string className = 1; - map<string, string> configs = 4; + // map in json format + string configs = 2; + + // configs used only when source feeds into functions + SubscriptionType subscriptionType = 3; + map<string,string> topicsToSerDeClassName = 4; } message PackageLocationMetaData { diff --git a/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java index c5f7dfa..31551ef 100644 --- a/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java +++ b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java @@ -21,7 +21,7 @@ package org.apache.pulsar.functions.proto; import static org.testng.Assert.assertEquals; import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees; +import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; import org.testng.annotations.Test; /** diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java index cf5238f..ff6098c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java @@ -30,7 +30,9 @@ import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.instance.InstanceConfig; -import org.apache.pulsar.functions.proto.Function.ConnectorDetails; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceControlGrpc; @@ -61,13 +63,6 @@ public class JavaInstanceMain { @Parameter(names = "--output_topic", description = "Output Topic Name\n") protected String outputTopicName; - @Parameter(names = "--custom_serde_input_topics", description = "Input Topics that need custom deserialization\n", required = false) - protected String customSerdeInputTopics; - @Parameter(names = "--custom_serde_classnames", description = "Input SerDe\n", required = false) - protected String customSerdeClassnames; - @Parameter(names = "--input_topics", description = "Input Topics\n", required = false) - protected String defaultSerdeInputTopics; - @Parameter(names = "--output_serde_classname", description = "Output SerDe\n") protected String outputSerdeClassName; @@ -75,7 +70,7 @@ public class JavaInstanceMain { protected String logTopic; @Parameter(names = "--processing_guarantees", description = "Processing Guarantees\n", required = true) - protected FunctionDetails.ProcessingGuarantees processingGuarantees; + protected ProcessingGuarantees processingGuarantees; @Parameter(names = "--instance_id", description = "Instance Id\n", required = true) protected String instanceId; @@ -104,15 +99,18 @@ public class JavaInstanceMain { @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n") protected String autoAck = "true"; - @Parameter(names = "--subscription_type", description = "What subscription type to use") - protected FunctionDetails.SubscriptionType subscriptionType; - - @Parameter(names = "--source_classname", description = "The source classname") + @Parameter(names = "--source_classname", description = "The source classname", required = true) protected String sourceClassname; - @Parameter(names = "--source_configs", description = "The source classname") + @Parameter(names = "--source_configs", description = "The source configs") protected String sourceConfigs; + @Parameter(names = "--source_subscription_type", description = "The source subscription type", required = true) + protected String sourceSubscriptionType; + + @Parameter(names = "--source_topics_serde_classname", description = "A map of topics to SerDe for the source", required = true) + protected String sourceTopicsSerdeClassName; + private Server server; @@ -130,22 +128,7 @@ public class JavaInstanceMain { functionDetailsBuilder.setNamespace(namespace); functionDetailsBuilder.setName(functionName); functionDetailsBuilder.setClassName(className); - if (defaultSerdeInputTopics != null) { - String[] inputTopics = defaultSerdeInputTopics.split(","); - for (String inputTopic : inputTopics) { - functionDetailsBuilder.addInputs(inputTopic); - } - } - if (customSerdeInputTopics != null && customSerdeClassnames != null) { - String[] inputTopics = customSerdeInputTopics.split(","); - String[] inputSerdeClassNames = customSerdeClassnames.split(","); - if (inputTopics.length != inputSerdeClassNames.length) { - throw new RuntimeException("Error specifying inputs"); - } - for (int i = 0; i < inputTopics.length; ++i) { - functionDetailsBuilder.putCustomSerdeInputs(inputTopics[i], inputSerdeClassNames[i]); - } - } + if (outputSerdeClassName != null) { functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName); } @@ -161,20 +144,21 @@ public class JavaInstanceMain { } else { functionDetailsBuilder.setAutoAck(false); } - functionDetailsBuilder.setSubscriptionType(subscriptionType); if (userConfig != null && !userConfig.isEmpty()) { Type type = new TypeToken<Map<String, String>>(){}.getType(); Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type); functionDetailsBuilder.putAllUserConfig(userConfigMap); } - ConnectorDetails.Builder sourceDetailsBuilder = ConnectorDetails.newBuilder(); + SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder(); sourceDetailsBuilder.setClassName(sourceClassname); - if (sourceConfigs != null && !sourceConfigs.isEmpty()) { - Type type = new TypeToken<Map<String, String>>(){}.getType(); - Map<String, String> sourceConfigMap = new Gson().fromJson(sourceConfigs, type); - sourceDetailsBuilder.putAllConfigs(sourceConfigMap); + if (sourceConfigs != null && !sourceConfigs.isEmpty()) {; + sourceDetailsBuilder.setConfigs(sourceConfigs); } + sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType)); + + sourceDetailsBuilder.putAllTopicsToSerDeClassName(new Gson().fromJson(sourceTopicsSerdeClassName, Map.class)); + functionDetailsBuilder.setSource(sourceDetailsBuilder); FunctionDetails functionDetails = functionDetailsBuilder.build(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java index 20407f8..bc49899 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java @@ -80,7 +80,7 @@ class ProcessRuntime implements Runtime { args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml"); args.add("-Dpulsar.log.dir=" + logDirectory); args.add("-Dpulsar.log.file=" + instanceConfig.getFunctionDetails().getName()); - args.add("org.apache.pulsar.functions.runtime.JavaInstanceMain"); + args.add(JavaInstanceMain.class.getName()); args.add("--jar"); args.add(codeFile); } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) { @@ -107,45 +107,11 @@ class ProcessRuntime implements Runtime { args.add(instanceConfig.getFunctionDetails().getName()); args.add("--function_classname"); args.add(instanceConfig.getFunctionDetails().getClassName()); - args.add("--subscription_type"); - args.add(instanceConfig.getFunctionDetails().getSubscriptionType().toString()); if (instanceConfig.getFunctionDetails().getLogTopic() != null && - !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) { + !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) { args.add("--log_topic"); args.add(instanceConfig.getFunctionDetails().getLogTopic()); } - if (instanceConfig.getFunctionDetails().getCustomSerdeInputsCount() > 0) { - String inputTopicString = ""; - String inputSerdeClassNameString = ""; - for (Map.Entry<String, String> entry : instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().entrySet()) { - if (inputTopicString.isEmpty()) { - inputTopicString = entry.getKey(); - } else { - inputTopicString = inputTopicString + "," + entry.getKey(); - } - if (inputSerdeClassNameString.isEmpty()) { - inputSerdeClassNameString = entry.getValue(); - } else { - inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue(); - } - } - args.add("--custom_serde_input_topics"); - args.add(inputTopicString); - args.add("--custom_serde_classnames"); - args.add(inputSerdeClassNameString); - } - if (instanceConfig.getFunctionDetails().getInputsCount() > 0) { - String inputTopicString = ""; - for (String topicName : instanceConfig.getFunctionDetails().getInputsList()) { - if (inputTopicString.isEmpty()) { - inputTopicString = topicName; - } else { - inputTopicString = inputTopicString + "," + topicName; - } - } - args.add("--input_topics"); - args.add(inputTopicString); - } args.add("--auto_ack"); if (instanceConfig.getFunctionDetails().getAutoAck()) { args.add("true"); @@ -176,16 +142,23 @@ class ProcessRuntime implements Runtime { instancePort = findAvailablePort(); args.add("--port"); args.add(String.valueOf(instancePort)); + if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) { - args.add("--source_classname"); - args.add(instanceConfig.getFunctionDetails().getSource().getClassName()); - Map<String, String> sourceConfigs = instanceConfig.getFunctionDetails().getSource().getConfigsMap(); + if (!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) { + args.add("--source_classname"); + args.add(instanceConfig.getFunctionDetails().getSource().getClassName()); + } + String sourceConfigs = instanceConfig.getFunctionDetails().getSource().getConfigs(); if (sourceConfigs != null && !sourceConfigs.isEmpty()) { - args.add("--source_config"); - args.add(new Gson().toJson(sourceConfigs)); + args.add("--source_configs"); + args.add(sourceConfigs); } } + args.add("--source_subscription_type"); + args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString()); + args.add("--source_topics_serde_classname"); + args.add(new Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap())); return args; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index 189067a..96fb2c2 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -19,19 +19,20 @@ package org.apache.pulsar.functions.runtime; +import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.runtime.ProcessRuntime; -import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; -import org.apache.pulsar.functions.runtime.ThreadRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.testng.Assert.assertEquals; @@ -46,6 +47,10 @@ public class ProcessRuntimeTest { private static final String TEST_TENANT = "test-function-tenant"; private static final String TEST_NAMESPACE = "test-function-namespace"; private static final String TEST_NAME = "test-function-container"; + private static final Map<String, String> topicsToSerDeClassName = new HashMap<>(); + static { + topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DefaultSerDe.class.getName()); + } private final ProcessRuntimeFactory factory; private final String userJarFile; @@ -76,12 +81,12 @@ public class ProcessRuntimeTest { functionDetailsBuilder.setNamespace(TEST_NAMESPACE); functionDetailsBuilder.setName(TEST_NAME); functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction"); - functionDetailsBuilder.addInputs(TEST_NAME + "-input1"); - functionDetailsBuilder.addInputs(TEST_NAME + "-input2"); functionDetailsBuilder.setOutput(TEST_NAME + "-output"); functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer"); functionDetailsBuilder.setLogTopic(TEST_NAME + "-log"); - functionDetailsBuilder.setSource(Function.ConnectorDetails.newBuilder() + functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder() + .setSubscriptionType(Function.SubscriptionType.FAILOVER) + .putAllTopicsToSerDeClassName(topicsToSerDeClassName) .setClassName("org.pulsar.pulsar.TestSource")); return functionDetailsBuilder.build(); } @@ -114,16 +119,16 @@ public class ProcessRuntimeTest { + " --namespace " + config.getFunctionDetails().getNamespace() + " --name " + config.getFunctionDetails().getName() + " --function_classname " + config.getFunctionDetails().getClassName() - + " --subscription_type " + config.getFunctionDetails().getSubscriptionType() + " --log_topic " + config.getFunctionDetails().getLogTopic() - + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2" + " --auto_ack false" + " --output_topic " + config.getFunctionDetails().getOutput() + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port " + args.get(42) - + " --source_classname " + config.getFunctionDetails().getSource().getClassName(); + + " --max_buffered_tuples 1024 --port " + args.get(38) + + " --source_classname " + config.getFunctionDetails().getSource().getClassName() + + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name() + + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName); assertEquals(expectedArgs, String.join(" ", args)); } @@ -142,15 +147,15 @@ public class ProcessRuntimeTest { + " --namespace " + config.getFunctionDetails().getNamespace() + " --name " + config.getFunctionDetails().getName() + " --function_classname " + config.getFunctionDetails().getClassName() - + " --subscription_type " + config.getFunctionDetails().getSubscriptionType() + " --log_topic " + config.getFunctionDetails().getLogTopic() - + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2" + " --auto_ack false" + " --output_topic " + config.getFunctionDetails().getOutput() + " --output_serde_classname " + config.getFunctionDetails().getOutputSerdeClassName() + " --processing_guarantees ATLEAST_ONCE" + " --pulsar_serviceurl " + pulsarServiceUrl - + " --max_buffered_tuples 1024 --port " + args.get(41); + + " --max_buffered_tuples 1024 --port " + args.get(37) + + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name() + + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName); assertEquals(expectedArgs, String.join(" ", args)); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index b855104..cb7d0a2 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -44,8 +44,16 @@ public class FunctionConfig { public enum SubscriptionType { SHARED, - EXCLUSIVE, - FAILOVER + FAILOVER; + + public org.apache.pulsar.client.api.SubscriptionType get() { + switch (this) { + case FAILOVER: + return org.apache.pulsar.client.api.SubscriptionType.Failover; + default: + return org.apache.pulsar.client.api.SubscriptionType.Shared; + } + } } public enum Runtime { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 6f98a42..04b6c70 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -54,6 +54,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; +import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; @@ -447,17 +448,18 @@ public class FunctionsImpl { } @POST - @Path("/{tenant}/{namespace}/{functionName}/trigger") + @Path("/{tenant}/{namespace}/{functionName}/{topic}/trigger") @Consumes(MediaType.MULTIPART_FORM_DATA) public Response triggerFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("name") String functionName, + final @PathParam("topic") String topic, final @FormDataParam("data") String input, final @FormDataParam("dataStream") InputStream uploadedInputStream) { FunctionDetails functionDetails; // validate parameters try { - validateTriggerRequestParams(tenant, namespace, functionName, input, uploadedInputStream); + validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream); } catch (IllegalArgumentException e) { log.error("Invalid trigger function request @ /{}/{}/{}", tenant, namespace, functionName, e); @@ -476,11 +478,13 @@ public class FunctionsImpl { } FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName); + String inputTopicToWrite; - if (functionMetaData.getFunctionDetails().getInputsList().size() > 0) { - inputTopicToWrite = functionMetaData.getFunctionDetails().getInputsList().get(0); + // only if the source is PulsarSource + if (functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) { + inputTopicToWrite = topic; } else { - inputTopicToWrite = functionMetaData.getFunctionDetails().getCustomSerdeInputs().entrySet().iterator().next().getKey(); + return Response.status(Status.BAD_REQUEST).build(); } String outputTopic = functionMetaData.getFunctionDetails().getOutput(); Reader reader = null; @@ -667,8 +671,11 @@ public class FunctionsImpl { if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) { missingFields.add("ClassName"); } - if (functionDetails.getInputsCount() == 0 && functionDetails.getCustomSerdeInputsCount() == 0) { - missingFields.add("Input"); + if (!functionDetails.getSource().isInitialized()) { + missingFields.add("Source"); + } + else if (functionDetails.getSource().getTopicsToSerDeClassNameMap().isEmpty()) { + missingFields.add("Source Topics Serde Map"); } if (!missingFields.isEmpty()) { String errorMessage = StringUtils.join(missingFields, ","); @@ -688,6 +695,7 @@ public class FunctionsImpl { private void validateTriggerRequestParams(String tenant, String namespace, String functionName, + String topic, String input, InputStream uploadedInputStream) { if (tenant == null) { @@ -699,6 +707,9 @@ public class FunctionsImpl { if (functionName == null) { throw new IllegalArgumentException("Function Name is not provided"); } + if (topic == null) { + throw new IllegalArgumentException("Topic Name is not provided"); + } if (uploadedInputStream == null && input == null) { throw new IllegalArgumentException("Trigger Data is not provided"); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 9ed70a4..12068d2 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -136,9 +136,10 @@ public class FunctionApiV2Resource extends FunctionApiResource { public Response triggerFunction(final @PathParam("tenant") String tenant, final @PathParam("namespace") String namespace, final @PathParam("name") String functionName, + final @PathParam("topic") String topic, final @FormDataParam("data") String input, final @FormDataParam("dataStream") InputStream uploadedInputStream) { - return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream); + return functions.triggerFunction(tenant, namespace, functionName, topic, input, uploadedInputStream); } @POST diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 5ae9e99..68be764 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -33,7 +33,9 @@ import com.google.gson.Gson; import com.google.common.collect.Lists; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -47,6 +49,9 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; +import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.proto.Function.SubscriptionType; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.worker.FunctionMetaDataManager; @@ -87,10 +92,13 @@ public class FunctionApiV2ResourceTest { private static final String namespace = "test-namespace"; private static final String function = "test-function"; private static final String outputTopic = "test-output-topic"; - private static final String inputTopic = "test-input-topic"; - private static final String inputSerdeClassName = DefaultSerDe.class.getName(); private static final String outputSerdeClassName = DefaultSerDe.class.getName(); private static final String className = TestFunction.class.getName(); + private SubscriptionType subscriptionType = SubscriptionType.FAILOVER; + private static final Map<String, String> topicsToSerDeClassName = new HashMap<>(); + static { + topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", DefaultSerDe.class.getName()); + } private static final int parallelism = 1; private WorkerService mockedWorkerService; @@ -137,12 +145,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Tenant"); + subscriptionType, topicsToSerDeClassName, "Tenant"); } @Test @@ -154,12 +160,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Namespace"); + subscriptionType, topicsToSerDeClassName, "Namespace"); } @Test @@ -171,12 +175,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Function Name"); + subscriptionType, topicsToSerDeClassName, "Function Name"); } @Test @@ -188,12 +190,10 @@ public class FunctionApiV2ResourceTest { null, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Function Package"); + subscriptionType, topicsToSerDeClassName, "Function Package"); } @Test @@ -205,33 +205,14 @@ public class FunctionApiV2ResourceTest { mockedInputStream, null, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, - className, - parallelism, - "Function Package"); - } - - @Test - public void testRegisterFunctionMissingInputTopic() throws IOException { - testRegisterFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - mockedFormData, - outputTopic, - null, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Input"); + subscriptionType, topicsToSerDeClassName, "Function Package"); } @Test - public void testRegisterFunctionMissingInputSerde() throws IOException { + public void testRegisterFunctionMissingClassName() throws IOException { testRegisterFunctionMissingArguments( tenant, namespace, @@ -239,33 +220,29 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, + outputSerdeClassName, null, - outputSerdeClassName, - className, parallelism, - "Input"); + subscriptionType, topicsToSerDeClassName, "ClassName"); } @Test - public void testRegisterFunctionMissingClassName() throws IOException { + public void testRegisterFunctionMissingParallelism() throws IOException { testRegisterFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - mockedFormData, - outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, - null, - parallelism, - "ClassName"); + tenant, + namespace, + function, + mockedInputStream, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + null, + subscriptionType, topicsToSerDeClassName, "parallelism"); } @Test - public void testRegisterFunctionMissingParallelism() throws IOException { + public void testRegisterFunctionMissingTopicsToSerDeClassName() throws IOException { testRegisterFunctionMissingArguments( tenant, namespace, @@ -273,28 +250,25 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, outputSerdeClassName, className, - null, - "parallelism"); + parallelism, + subscriptionType, null, "Source Topics Serde Map"); } private void testRegisterFunctionMissingArguments( - String tenant, - String namespace, - String function, - InputStream inputStream, - FormDataContentDisposition details, - String outputTopic, - String inputTopic, - String inputSerdeClassName, - String outputSerdeClassName, - String className, - Integer parallelism, - String missingFieldName - ) throws IOException { + String tenant, + String namespace, + String function, + InputStream inputStream, + FormDataContentDisposition details, + String outputTopic, + String outputSerdeClassName, + String className, + Integer parallelism, + SubscriptionType subscriptionType, + Map<String, String> topicToSerDeClassName, + String missingFieldName) throws IOException { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (tenant != null) { functionDetailsBuilder.setTenant(tenant); @@ -308,9 +282,6 @@ public class FunctionApiV2ResourceTest { if (outputTopic != null) { functionDetailsBuilder.setOutput(outputTopic); } - if (inputTopic != null && inputSerdeClassName != null) { - functionDetailsBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName); - } if (outputSerdeClassName != null) { functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName); } @@ -320,6 +291,14 @@ public class FunctionApiV2ResourceTest { if (parallelism != null) { functionDetailsBuilder.setParallelism(parallelism); } + if (subscriptionType != null) { + functionDetailsBuilder.setSource( + SourceSpec.newBuilder().setSubscriptionType(subscriptionType)); + } + if (topicToSerDeClassName != null) { + functionDetailsBuilder.setSource( + SourceSpec.newBuilder().putAllTopicsToSerDeClassName(topicToSerDeClassName)); + } FunctionDetails functionDetails = functionDetailsBuilder.build(); Response response = resource.registerFunction( @@ -341,10 +320,12 @@ public class FunctionApiV2ResourceTest { private Response registerDefaultFunction() throws IOException { FunctionDetails functionDetails = FunctionDetails.newBuilder() .setTenant(tenant).setNamespace(namespace).setName(function) - .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName) + .setOutput(outputTopic) .setOutputSerdeClassName(outputSerdeClassName) .setClassName(className) - .setParallelism(parallelism).build(); + .setParallelism(parallelism) + .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) + .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); return resource.registerFunction( tenant, namespace, @@ -457,12 +438,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Tenant"); + subscriptionType, topicsToSerDeClassName, "Tenant"); } @Test @@ -474,12 +453,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Namespace"); + subscriptionType, topicsToSerDeClassName, "Namespace"); } @Test @@ -491,12 +468,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Function Name"); + subscriptionType, topicsToSerDeClassName, "Function Name"); } @Test @@ -508,12 +483,10 @@ public class FunctionApiV2ResourceTest { null, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Function Package"); + subscriptionType, topicsToSerDeClassName, "Function Package"); } @Test @@ -525,46 +498,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, null, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, - className, - parallelism, - "Function Package"); - } - - @Test - public void testUpdateFunctionMissingSourceTopic() throws IOException { - testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - mockedFormData, - outputTopic, - null, - inputSerdeClassName, - outputSerdeClassName, - className, - parallelism, - "Input"); - } - - @Test - public void testUpdateFunctionMissingInputSerde() throws IOException { - testUpdateFunctionMissingArguments( - tenant, - namespace, - function, - mockedInputStream, - mockedFormData, - outputTopic, - inputTopic, - null, - outputSerdeClassName, + outputSerdeClassName, className, parallelism, - "Input"); + subscriptionType, topicsToSerDeClassName, "Function Package"); } @Test @@ -576,12 +513,10 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, - outputSerdeClassName, + outputSerdeClassName, null, parallelism, - "ClassName"); + subscriptionType, topicsToSerDeClassName, "ClassName"); } @Test public void testUpdateFunctionMissingParallelism() throws IOException { @@ -592,29 +527,40 @@ public class FunctionApiV2ResourceTest { mockedInputStream, mockedFormData, outputTopic, - inputTopic, - inputSerdeClassName, outputSerdeClassName, className, null, - "parallelism"); + subscriptionType, topicsToSerDeClassName, "parallelism"); } + @Test + public void testUpdateFunctionMissingTopicsToSerDeClassName() throws IOException { + testUpdateFunctionMissingArguments( + tenant, + namespace, + function, + mockedInputStream, + mockedFormData, + outputTopic, + outputSerdeClassName, + className, + parallelism, + subscriptionType, null, "Source Topics Serde Map"); + } private void testUpdateFunctionMissingArguments( - String tenant, - String namespace, - String function, - InputStream inputStream, - FormDataContentDisposition details, - String outputTopic, - String inputTopic, - String inputSerdeClassName, - String outputSerdeClassName, - String className, - Integer parallelism, - String missingFieldName - ) throws IOException { + String tenant, + String namespace, + String function, + InputStream inputStream, + FormDataContentDisposition details, + String outputTopic, + String outputSerdeClassName, + String className, + Integer parallelism, + SubscriptionType subscriptionType, + Map<String, String> topicToSerDeClassName, + String missingFieldName) throws IOException { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); if (tenant != null) { functionDetailsBuilder.setTenant(tenant); @@ -628,9 +574,6 @@ public class FunctionApiV2ResourceTest { if (outputTopic != null) { functionDetailsBuilder.setOutput(outputTopic); } - if (inputTopic != null && inputSerdeClassName != null) { - functionDetailsBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName); - } if (outputSerdeClassName != null) { functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName); } @@ -640,6 +583,14 @@ public class FunctionApiV2ResourceTest { if (parallelism != null) { functionDetailsBuilder.setParallelism(parallelism); } + if (subscriptionType != null) { + functionDetailsBuilder.setSource( + SourceSpec.newBuilder().setSubscriptionType(this.subscriptionType)); + } + if (topicToSerDeClassName != null) { + functionDetailsBuilder.setSource( + SourceSpec.newBuilder().putAllTopicsToSerDeClassName(topicToSerDeClassName)); + } FunctionDetails functionDetails = functionDetailsBuilder.build(); Response response = resource.updateFunction( @@ -661,10 +612,12 @@ public class FunctionApiV2ResourceTest { private Response updateDefaultFunction() throws IOException { FunctionDetails functionDetails = FunctionDetails.newBuilder() .setTenant(tenant).setNamespace(namespace).setName(function) - .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName) + .setOutput(outputTopic) .setOutputSerdeClassName(outputSerdeClassName) .setClassName(className) - .setParallelism(parallelism).build(); + .setParallelism(parallelism) + .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) + .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); return resource.updateFunction( tenant, namespace, @@ -935,14 +888,15 @@ public class FunctionApiV2ResourceTest { FunctionDetails functionDetails = FunctionDetails.newBuilder() .setClassName(className) - .putCustomSerdeInputs(inputTopic, inputSerdeClassName) .setOutputSerdeClassName(outputSerdeClassName) .setName(function) .setNamespace(namespace) - .setProcessingGuarantees(FunctionDetails.ProcessingGuarantees.ATMOST_ONCE) + .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE) .setOutput(outputTopic) .setTenant(tenant) - .setParallelism(parallelism).build(); + .setParallelism(parallelism) + .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType) + .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build(); FunctionMetaData metaData = FunctionMetaData.newBuilder() .setCreateTime(System.currentTimeMillis()) .setFunctionDetails(functionDetails) -- To stop receiving notification emails like this one, please contact si...@apache.org.