This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a1b9a  additional refactoring to use source interface (#1681)
e9a1b9a is described below

commit e9a1b9a58b28687558b29705ea66047ef82fd05c
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon Apr 30 15:21:50 2018 -0700

    additional refactoring to use source interface (#1681)
    
    * additional refactoring to use source interface
    
    * removing PulsarConstants
    
    * remove unnecessary import
    
    * removing sink message for now
    
    * addressing comments
    
    * adding null check
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |   3 +-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 148 +++++-----
 .../org/apache/pulsar/functions/api/Context.java   |   3 +-
 .../pulsar/functions/instance/ContextImpl.java     |   2 +-
 .../pulsar/functions/instance/JavaInstance.java    |   3 +-
 .../functions/instance/JavaInstanceRunnable.java   |  29 +-
 .../instance/processors/AtLeastOnceProcessor.java  |   5 +-
 .../instance/processors/AtMostOnceProcessor.java   |   5 +-
 .../processors/EffectivelyOnceProcessor.java       |  12 +-
 .../instance/processors/MessageProcessor.java      |  28 +-
 .../instance/processors/MessageProcessorBase.java  |  50 ++--
 .../{instance => source}/PulsarConfig.java         |  26 +-
 .../{instance => source}/PulsarRecord.java         |   2 +-
 .../{instance => source}/PulsarSource.java         |  63 ++++-
 .../instance/src/main/python/Function_pb2.py       | 305 ++++++++++-----------
 .../instance/src/main/python/python_instance.py    |  26 +-
 .../src/main/python/python_instance_main.py        |  35 ++-
 .../instance/JavaInstanceRunnableTest.java         |  45 ---
 .../functions/instance/JavaInstanceTest.java       |   4 +-
 .../pulsar/functions/source/PulsarSourceTest.java  | 136 +++++++++
 .../proto/src/main/proto/Function.proto            |  52 ++--
 .../functions/proto/FunctionDetailsTest.java       |   2 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  56 ++--
 .../pulsar/functions/runtime/ProcessRuntime.java   |  55 +---
 .../functions/runtime/ProcessRuntimeTest.java      |  31 ++-
 .../pulsar/functions/utils/FunctionConfig.java     |  12 +-
 .../functions/worker/rest/api/FunctionsImpl.java   |  25 +-
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |   3 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 278 ++++++++-----------
 29 files changed, 731 insertions(+), 713 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index e5c050e..80e98af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -237,11 +237,12 @@ public class FunctionsBase extends AdminResource 
implements Supplier<WorkerServi
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String 
namespace,
                                     final @PathParam("functionName") String 
functionName,
+                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String 
triggerValue,
                                     final @FormDataParam("dataStream") 
InputStream triggerStream) {
 
         return functions.triggerFunction(
-                tenant, namespace, functionName, triggerValue, triggerStream);
+                tenant, namespace, functionName, topic, triggerValue, 
triggerStream);
 
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 92bd757..e2f0a71 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -23,12 +23,15 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.isNull;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +52,7 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.instance.PulsarSource;
+import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
@@ -59,8 +62,11 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
-import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 
@@ -323,7 +329,7 @@ public class CmdFunctions extends CmdBase {
         }
 
         private void doJavaSubmitChecks(FunctionConfig functionConfig) {
-            if (isNull(className)) {
+            if (isNull(functionConfig.getClassName())) {
                 throw new IllegalArgumentException("You supplied a jar file 
but no main class");
             }
 
@@ -527,6 +533,68 @@ public class CmdFunctions extends CmdBase {
                 return 
functionConfig.getCustomSerdeInputs().keySet().iterator().next();
             }
         }
+
+        protected FunctionDetails convert(FunctionConfig functionConfig)
+                throws IOException {
+            FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
+
+            // Setup source
+            Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
+            
topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
+            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                sourceSpecBuilder.setClassName(PulsarSource.class.getName());
+            }
+            functionConfig.getInputs().forEach(v -> 
topicToSerDeClassNameMap.put(v, ""));
+            
sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
+            if (functionConfig.getSubscriptionType() != null) {
+                sourceSpecBuilder
+                        
.setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType()));
+            }
+            functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+            if (functionConfig.getTenant() != null) {
+                functionDetailsBuilder.setTenant(functionConfig.getTenant());
+            }
+            if (functionConfig.getNamespace() != null) {
+                
functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
+            }
+            if (functionConfig.getName() != null) {
+                functionDetailsBuilder.setName(functionConfig.getName());
+            }
+            if (functionConfig.getClassName() != null) {
+                
functionDetailsBuilder.setClassName(functionConfig.getClassName());
+            }
+            if (functionConfig.getOutput() != null) {
+                functionDetailsBuilder.setOutput(functionConfig.getOutput());
+            }
+            if (functionConfig.getOutputSerdeClassName() != null) {
+                
functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName());
+            }
+            if (functionConfig.getLogTopic() != null) {
+                
functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
+            }
+            if (functionConfig.getRuntime() != null) {
+                
functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime()));
+            }
+            if (!functionConfig.getUserConfig().isEmpty()) {
+                
functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig());
+            }
+            if (functionConfig.getProcessingGuarantees() != null) {
+                functionDetailsBuilder.setProcessingGuarantees(
+                        
convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
+            }
+            functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
+            
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
+            return functionDetailsBuilder.build();
+        }
+
+        protected org.apache.pulsar.functions.proto.Function.FunctionDetails 
convertProto2(FunctionConfig functionConfig)
+                throws IOException {
+            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
+            Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), 
functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        }
     }
 
     @Parameters(commandDescription = "Run the Pulsar Function locally (rather 
than deploying it to the Pulsar cluster)")
@@ -871,64 +939,19 @@ public class CmdFunctions extends CmdBase {
             throw new IllegalArgumentException("You must specify one or more 
input topics for the function");
         }
     }
-    
-    private org.apache.pulsar.functions.proto.Function.FunctionDetails 
convertProto2(FunctionConfig functionConfig)
-            throws IOException {
-        org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder 
functionDetailsBuilder = 
org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-        Utils.mergeJson(FunctionsImpl.printJson(convert(functionConfig)), 
functionDetailsBuilder);
-        return functionDetailsBuilder.build();
-    }
 
-    private FunctionDetails convert(FunctionConfig functionConfig)
-            throws IOException {
-        FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
-        if (functionConfig.getInputs() != null) {
-            functionDetailsBuilder.setTenant(functionConfig.getTenant());
-        }
-        functionDetailsBuilder.setSource(
-                ConnectorDetails.newBuilder()
-                        .setClassName(PulsarSource.class.getName())
-                        .build());
-        if (functionConfig.getNamespace() != null) {
-            functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
-        }
-        if (functionConfig.getName() != null) {
-            functionDetailsBuilder.setName(functionConfig.getName());
-        }
-        if (functionConfig.getClassName() != null) {
-            functionDetailsBuilder.setClassName(functionConfig.getClassName());
-        }
-        
functionDetailsBuilder.putAllCustomSerdeInputs(functionConfig.getCustomSerdeInputs());
-        if (functionConfig.getOutputSerdeClassName() != null) {
-            
functionDetailsBuilder.setOutputSerdeClassName(functionConfig.getOutputSerdeClassName());
-        }
-        if (functionConfig.getOutput() != null) {
-            functionDetailsBuilder.setOutput(functionConfig.getOutput());
-        }
-        if (functionConfig.getLogTopic() != null) {
-            functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
-        }
-        if (functionConfig.getProcessingGuarantees() != null) {
-            functionDetailsBuilder.setProcessingGuarantees(
-                    
convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
-        }
-        
functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig());
-        if (functionConfig.getSubscriptionType() != null) {
-            functionDetailsBuilder.setSubscriptionType(
-                    
convertSubscriptionType(functionConfig.getSubscriptionType()));
-        }
-        if (functionConfig.getRuntime() != null) {
-            
functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime()));
+    private static FunctionDetails.Runtime 
convertRuntime(FunctionConfig.Runtime runtime) {
+        for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) {
+            if (type.name().equals(runtime.name())) {
+                return type;
+            }
         }
-        functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
-        functionDetailsBuilder.addAllInputs(functionConfig.getInputs());
-        functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
-        return functionDetailsBuilder.build();
+        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
     }
 
-    private static FunctionDetails.SubscriptionType convertSubscriptionType(
+    private static SubscriptionType convertSubscriptionType(
             FunctionConfig.SubscriptionType subscriptionType) {
-        for (FunctionDetails.SubscriptionType type : 
FunctionDetails.SubscriptionType.values()) {
+        for (SubscriptionType type : SubscriptionType.values()) {
             if (type.name().equals(subscriptionType.name())) {
                 return type;
             }
@@ -936,9 +959,9 @@ public class CmdFunctions extends CmdBase {
         throw new RuntimeException("Unrecognized subscription type: " + 
subscriptionType.name());
     }
 
-    private static FunctionDetails.ProcessingGuarantees 
convertProcessingGuarantee(
+    private static ProcessingGuarantees convertProcessingGuarantee(
             FunctionConfig.ProcessingGuarantees processingGuarantees) {
-        for (FunctionDetails.ProcessingGuarantees type : 
FunctionDetails.ProcessingGuarantees.values()) {
+        for (ProcessingGuarantees type : ProcessingGuarantees.values()) {
             if (type.name().equals(processingGuarantees.name())) {
                 return type;
             }
@@ -946,15 +969,6 @@ public class CmdFunctions extends CmdBase {
         throw new RuntimeException("Unrecognized processing guarantee: " + 
processingGuarantees.name());
     }
 
-    private static FunctionDetails.Runtime 
convertRuntime(FunctionConfig.Runtime runtime) {
-        for (FunctionDetails.Runtime type : FunctionDetails.Runtime.values()) {
-            if (type.name().equals(runtime.name())) {
-                return type;
-            }
-        }
-        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
-    }
-
     private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig 
functionConfig) {
         String[] args = fqfn.split("/");
         if (args.length != 3) {
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 653d176..01d8291 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -162,8 +162,7 @@ public interface Context {
      * By default acknowledgement management is done transparently by Pulsar 
Functions framework.
      * However users can disable that and do ack management by themselves by 
using this API.
      * @param messageId The messageId that needs to be acknowledged
-     * @param topic The topic name that the message belongs to that  needs to 
be acknowledged
      * @return A future that completes when the framework is done acking the 
message
      */
-    CompletableFuture<Void> ack(byte[] messageId, String topic);
+    CompletableFuture<Void> ack(byte[] messageId);
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 7a01d25..16a04f4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -249,7 +249,7 @@ class ContextImpl implements Context {
 
     //TODO remove topic argument
     @Override
-    public CompletableFuture<Void> ack(byte[] messageId, String topic) {
+    public CompletableFuture<Void> ack(byte[] messageId) {
         MessageId actualMessageId = null;
         try {
             actualMessageId = MessageId.fromByteArray(messageId);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 67f69bf..b78f5fc 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -27,8 +27,7 @@ import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
-import java.util.Map;
-
+import org.apache.pulsar.functions.source.PulsarSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d3853e2..e181204 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -93,7 +94,6 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     private Exception deathException;
 
     @Getter(AccessLevel.PACKAGE)
-    private Map<String, SerDe> inputSerDe;
     private SerDe outputSerDe;
 
     @Getter(AccessLevel.PACKAGE)
@@ -159,7 +159,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         // start the output producer
         processor.setupOutput(outputSerDe);
         // start the input consumer
-        processor.setupInput(inputSerDe);
+        processor.setupInput(typeArgs[0]);
         // start any log topic handler
         setupLogHandler();
 
@@ -414,31 +414,6 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     }
 
     private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
-
-        this.inputSerDe = new HashMap<>();
-        
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) -> 
this.inputSerDe.put(k, InstanceUtils.initializeSerDe(v, clsLoader, 
typeArgs[0])));
-        for (String topicName : 
instanceConfig.getFunctionDetails().getInputsList()) {
-            this.inputSerDe.put(topicName, 
InstanceUtils.initializeDefaultSerDe(typeArgs[0]));
-        }
-
-        if (Void.class.equals(typeArgs[0])) {
-            throw new RuntimeException("Input type of Pulsar Function cannot 
be Void");
-        }
-
-        for (SerDe serDe : inputSerDe.values()) {
-            if 
(serDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
-                if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                    throw new RuntimeException("Default Serde does not support 
" + typeArgs[0]);
-                }
-            } else {
-                Class<?>[] inputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-                if (!typeArgs[0].isAssignableFrom(inputSerdeTypeArgs[0])) {
-                    throw new RuntimeException("Inconsistent types found 
between function input type and input serde type: "
-                            + " function type = " + typeArgs[0] + " should be 
assignable from " + inputSerdeTypeArgs[0]);
-                }
-            }
-        }
-
         if (!Void.class.equals(typeArgs[1])) { // return type is not 
`Void.class`
             if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() 
== null
                     || 
instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty()
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 465d198..d51ae02 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -40,9 +40,8 @@ public class AtLeastOnceProcessor extends 
MessageProcessorBase {
     private Producer<byte[]> producer;
 
     AtLeastOnceProcessor(PulsarClient client,
-                         FunctionDetails functionDetails,
-                         SubscriptionType subType) {
-        super(client, functionDetails, subType);
+                         FunctionDetails functionDetails) {
+        super(client, functionDetails);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index 08cf111..30b1630 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -38,9 +38,8 @@ class AtMostOnceProcessor extends MessageProcessorBase {
     private Producer<byte[]> producer;
 
     AtMostOnceProcessor(PulsarClient client,
-                        FunctionDetails functionDetails,
-                        SubscriptionType subType) {
-        super(client, functionDetails, subType);
+                        FunctionDetails functionDetails) {
+        super(client, functionDetails);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index aafca69..06a463b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.functions.instance.PulsarRecord;
+import org.apache.pulsar.functions.source.PulsarRecord;
 import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -46,15 +46,7 @@ class EffectivelyOnceProcessor extends MessageProcessorBase 
implements ConsumerE
 
     EffectivelyOnceProcessor(PulsarClient client,
                              FunctionDetails functionDetails) {
-        super(client, functionDetails, SubscriptionType.Failover);
-    }
-
-    /**
-     * An effectively-once processor can only use `Failover` subscription.
-     */
-    @Override
-    protected SubscriptionType getSubscriptionType() {
-        return SubscriptionType.Failover;
+        super(client, functionDetails);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index fd22adb..0dcf12c 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -24,12 +24,11 @@ import 
org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 
 /**
  * A processor that processes messages, used by {@link 
org.apache.pulsar.functions.instance.JavaInstanceRunnable}.
@@ -39,16 +38,7 @@ public interface MessageProcessor extends AutoCloseable {
 
     static MessageProcessor create(PulsarClient client,
                                    FunctionDetails functionDetails) {
-        FunctionDetails.SubscriptionType fnSubType = 
functionDetails.getSubscriptionType();
         ProcessingGuarantees processingGuarantees = 
functionDetails.getProcessingGuarantees();
-        SubscriptionType subType;
-        if (FunctionDetails.SubscriptionType.SHARED == fnSubType) {
-            subType = SubscriptionType.Shared;
-        } else if (FunctionDetails.SubscriptionType.EXCLUSIVE == fnSubType) {
-            subType = SubscriptionType.Exclusive;
-        } else {
-            subType = SubscriptionType.Failover;
-        }
 
         if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
             return new EffectivelyOnceProcessor(
@@ -57,25 +47,23 @@ public interface MessageProcessor extends AutoCloseable {
         } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) {
             return new AtMostOnceProcessor(
                 client,
-                functionDetails,
-                subType);
+                functionDetails);
         } else {
             return new AtLeastOnceProcessor(
                 client,
-                functionDetails,
-                subType);
+                functionDetails);
         }
     }
 
     void postReceiveMessage(Record record);
 
     /**
-     * Setup the input with a provided <i>processQueue</i>. The implementation 
of this processor is responsible for
-     * setting up the input and passing the received messages from input to 
the provided <i>processQueue</i>.
-     *
-     * @param inputSerDe SerDe to deserialize messages from input.
+     * Setup the source. Implementation is responsible for initializing the 
source
+     * and for calling open method for source
+     * @param inputType the input type of the function
+     * @throws Exception
      */
-    void setupInput(Map<String, SerDe> inputSerDe)
+    void setupInput(Class<?> inputType)
         throws Exception;
 
     /**
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index 06bc175..a2a5d8b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pulsar.functions.instance.processors;
 
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
+import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -31,9 +30,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.PulsarConfig;
-import org.apache.pulsar.functions.instance.PulsarSource;
+import org.apache.pulsar.functions.source.PulsarConfig;
+import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
 
@@ -45,22 +45,15 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
 
     protected final PulsarClient client;
     protected final FunctionDetails functionDetails;
-    protected final SubscriptionType subType;
 
     @Getter
     protected Source source;
 
-    protected List<String> topics;
 
     protected MessageProcessorBase(PulsarClient client,
-                                   FunctionDetails functionDetails,
-                                   SubscriptionType subType) {
+                                   FunctionDetails functionDetails) {
         this.client = client;
         this.functionDetails = functionDetails;
-        this.subType = subType;
-        this.topics = new LinkedList<>();
-        this.topics.addAll(this.functionDetails.getInputsList());
-        
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
     }
 
     //
@@ -68,26 +61,31 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
     //
 
     @Override
-    public void setupInput(Map<String, SerDe> inputSerDe) throws Exception {
+    public void setupInput(Class<?> inputType) throws Exception {
 
-        org.apache.pulsar.functions.proto.Function.ConnectorDetails 
connectorDetails = this.functionDetails.getSource();
+        org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = 
this.functionDetails.getSource();
         Object object;
-        if 
(connectorDetails.getClassName().equals(PulsarSource.class.getName())) {
-            PulsarConfig pulsarConfig = PulsarConfig.builder()
-                    .topicToSerdeMap(inputSerDe)
-                    
.subscription(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
-                    
.processingGuarantees(this.functionDetails.getProcessingGuarantees())
-                    .subscriptionType(this.subType)
-                    .build();
+        if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
+
+            PulsarConfig pulsarConfig = new PulsarConfig();
+            
pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap());
+            
pulsarConfig.setSubscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails));
+            pulsarConfig.setProcessingGuarantees(
+                    
FunctionConfig.ProcessingGuarantees.valueOf(this.functionDetails.getProcessingGuarantees().name()));
+            pulsarConfig.setSubscriptionType(
+                    
FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name()));
+            pulsarConfig.setTypeClassName(inputType.getName());
+
             Object[] params = {this.client, pulsarConfig};
             Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
+
             object = Reflections.createInstance(
-                    connectorDetails.getClassName(),
+                    sourceSpec.getClassName(),
                     PulsarSource.class.getClassLoader(), params, paramTypes);
 
         } else {
             object = Reflections.createInstance(
-                    connectorDetails.getClassName(),
+                    sourceSpec.getClassName(),
                     Thread.currentThread().getContextClassLoader());
         }
 
@@ -101,7 +99,7 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
         this.source = (Source) object;
 
         try {
-            this.source.open(connectorDetails.getConfigsMap());
+            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), 
Map.class));
         } catch (Exception e) {
             log.info("Error occurred executing open for source: {}",
                     this.functionDetails.getSource().getClassName(), e);
@@ -109,10 +107,6 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
 
     }
 
-    protected SubscriptionType getSubscriptionType() {
-        return subType;
-    }
-
     public Record recieveMessage() throws Exception {
         return this.source.read();
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
similarity index 58%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
index c812a77..2a5dc44 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
@@ -16,27 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.source;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.Builder;
 import lombok.Data;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 
+import java.io.IOException;
 import java.util.Map;
 
 @Getter
 @Setter
-@Data
-@Builder
 @ToString
 public class PulsarConfig {
-    private Function.FunctionDetails.ProcessingGuarantees processingGuarantees;
-    private SubscriptionType subscriptionType;
-    private String subscription;
-    private Map<String, SerDe> topicToSerdeMap;
+
+    private FunctionConfig.ProcessingGuarantees processingGuarantees;
+    private FunctionConfig.SubscriptionType subscriptionType;
+    private String subscriptionName;
+    private Map<String, String> topicSerdeClassNameMap;
+    private String typeClassName;
+
+    public static PulsarConfig load(Map<String, Object> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
PulsarConfig.class);
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
similarity index 97%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 2bdbdb1..7211db1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.source;
 
 import lombok.Builder;
 import lombok.Data;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
similarity index 62%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 43d9350..caaa7bf 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -16,19 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.source;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.Source;
-import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -37,6 +42,7 @@ public class PulsarSource<T> implements Source<T> {
 
     private PulsarClient pulsarClient;
     private PulsarConfig pulsarConfig;
+    private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
 
     @Getter
     private org.apache.pulsar.client.api.Consumer inputConsumer;
@@ -48,10 +54,14 @@ public class PulsarSource<T> implements Source<T> {
 
     @Override
     public void open(Map<String, Object> config) throws Exception {
+        // Setup Serialization/Deserialization
+        setupSerde();
+
+        // Setup pulsar consumer
         this.inputConsumer = this.pulsarClient.newConsumer()
-                .topics(new 
ArrayList<>(this.pulsarConfig.getTopicToSerdeMap().keySet()))
-                .subscriptionName(this.pulsarConfig.getSubscription())
-                .subscriptionType(this.pulsarConfig.getSubscriptionType())
+                .topics(new 
ArrayList<>(this.pulsarConfig.getTopicSerdeClassNameMap().keySet()))
+                .subscriptionName(this.pulsarConfig.getSubscriptionName())
+                
.subscriptionType(this.pulsarConfig.getSubscriptionType().get())
                 .ackTimeout(1, TimeUnit.MINUTES)
                 .subscribe();
     }
@@ -71,13 +81,13 @@ public class PulsarSource<T> implements Source<T> {
             MessageIdImpl messageId = (MessageIdImpl) 
topicMessageId.getInnerMessageId();
             partitionId = Long.toString(messageId.getPartitionIndex());
         } else {
-            topicName = 
this.pulsarConfig.getTopicToSerdeMap().keySet().iterator().next();
+            topicName = 
this.pulsarConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
             partitionId = Long.toString(((MessageIdImpl) 
message.getMessageId()).getPartitionIndex());
         }
 
         Object object;
         try {
-            object = 
this.pulsarConfig.getTopicToSerdeMap().get(topicName).deserialize(message.getData());
+            object = 
this.topicToSerDeMap.get(topicName).deserialize(message.getData());
         } catch (Exception e) {
             //TODO Add deserialization exception stats
             throw new RuntimeException("Error occured when attempting to 
deserialize input:", e);
@@ -97,15 +107,13 @@ public class PulsarSource<T> implements Source<T> {
                 .sequenceId(message.getSequenceId())
                 .topicName(topicName)
                 .ackFunction(() -> {
-                    if (pulsarConfig.getProcessingGuarantees()
-                            == 
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
                     } else {
                         inputConsumer.acknowledgeAsync(message);
                     }
                 }).failFunction(() -> {
-                    if (pulsarConfig.getProcessingGuarantees()
-                            == 
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         throw new RuntimeException("Failed to process message: 
" + message.getMessageId());
                     }
                 })
@@ -117,4 +125,37 @@ public class PulsarSource<T> implements Source<T> {
     public void close() throws Exception {
         this.inputConsumer.close();
     }
+
+    private void setupSerde() throws ClassNotFoundException {
+
+        Class<?> typeArg = Class.forName(this.pulsarConfig.getTypeClassName());
+        if (Void.class.equals(typeArg)) {
+            throw new RuntimeException("Input type of Pulsar Function cannot 
be Void");
+        }
+
+        for (Map.Entry<String, String> entry : 
this.pulsarConfig.getTopicSerdeClassNameMap().entrySet()) {
+            String topic = entry.getKey();
+            String serDeClassname = entry.getValue();
+            if (serDeClassname.isEmpty()) {
+                serDeClassname = DefaultSerDe.class.getName();
+            }
+            SerDe serDe = InstanceUtils.initializeSerDe(serDeClassname,
+                    Thread.currentThread().getContextClassLoader(), typeArg);
+            this.topicToSerDeMap.put(topic, serDe);
+        }
+
+        for (SerDe serDe : this.topicToSerDeMap.values()) {
+            if 
(serDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
+                if (!DefaultSerDe.IsSupportedType(typeArg)) {
+                    throw new RuntimeException("Default Serde does not support 
" + typeArg);
+                }
+            } else {
+                Class<?>[] inputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
+                if (!typeArg.isAssignableFrom(inputSerdeTypeArgs[0])) {
+                    throw new RuntimeException("Inconsistent types found 
between function input type and input serde type: "
+                            + " function type = " + typeArg + " should be 
assignable from " + inputSerdeTypeArgs[0]);
+                }
+            }
+        }
+    }
 }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py 
b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 0f65991..b988383 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,13 +17,12 @@
 # under the License.
 #
 
-# -*- encoding: utf-8 -*-
-
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
 import sys
 _b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf.internal import enum_type_wrapper
 from google.protobuf import descriptor as _descriptor
 from google.protobuf import message as _message
 from google.protobuf import reflection as _reflection
@@ -41,14 +39,12 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xd5\x06\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12H\n\x11\x63ustomSerdeInputs\x18\x05 
\x03(\x0b\x32-.proto.FunctionDetails.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06
 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 
\x01(\t\x12I\n\x14processingGuarantees\x18 [...]
+  
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf9\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01
 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 
\x01(\t\x12\x11\n\tclassName\x18\x04 
\x01(\t\x12\x1c\n\x14outputSerdeClassName\x18\x05 
\x01(\t\x12\x0e\n\x06output\x18\x06 \x01(\t\x12\x10\n\x08logTopic\x18\x07 
\x01(\t\x12\x39\n\x14processingGuarantees\x18\x08 
\x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\t 
\x03(\x0b\x32&.proto. [...]
 )
 
-
-
-_FUNCTIONDETAILS_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
+_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   name='ProcessingGuarantees',
-  full_name='proto.FunctionDetails.ProcessingGuarantees',
+  full_name='proto.ProcessingGuarantees',
   filename=None,
   file=DESCRIPTOR,
   values=[
@@ -67,14 +63,15 @@ _FUNCTIONDETAILS_PROCESSINGGUARANTEES = 
_descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=706,
-  serialized_end=785,
+  serialized_start=1187,
+  serialized_end=1266,
 )
-_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_PROCESSINGGUARANTEES)
+_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
-_FUNCTIONDETAILS_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
+ProcessingGuarantees = enum_type_wrapper.EnumTypeWrapper(_PROCESSINGGUARANTEES)
+_SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   name='SubscriptionType',
-  full_name='proto.FunctionDetails.SubscriptionType',
+  full_name='proto.SubscriptionType',
   filename=None,
   file=DESCRIPTOR,
   values=[
@@ -83,20 +80,24 @@ _FUNCTIONDETAILS_SUBSCRIPTIONTYPE = 
_descriptor.EnumDescriptor(
       options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
-      name='EXCLUSIVE', index=1, number=1,
-      options=None,
-      type=None),
-    _descriptor.EnumValueDescriptor(
-      name='FAILOVER', index=2, number=2,
+      name='FAILOVER', index=1, number=1,
       options=None,
       type=None),
   ],
   containing_type=None,
   options=None,
-  serialized_start=787,
-  serialized_end=846,
+  serialized_start=1268,
+  serialized_end=1312,
 )
-_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_SUBSCRIPTIONTYPE)
+_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
+
+SubscriptionType = enum_type_wrapper.EnumTypeWrapper(_SUBSCRIPTIONTYPE)
+ATLEAST_ONCE = 0
+ATMOST_ONCE = 1
+EFFECTIVELY_ONCE = 2
+SHARED = 0
+FAILOVER = 1
+
 
 _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   name='Runtime',
@@ -115,49 +116,12 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=848,
-  serialized_end=879,
+  serialized_start=500,
+  serialized_end=531,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
 
-_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY = _descriptor.Descriptor(
-  name='CustomSerdeInputsEntry',
-  full_name='proto.FunctionDetails.CustomSerdeInputsEntry',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='key', 
full_name='proto.FunctionDetails.CustomSerdeInputsEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='value', 
full_name='proto.FunctionDetails.CustomSerdeInputsEntry.value', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), 
_b('8\001')),
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=597,
-  serialized_end=653,
-)
-
 _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
   name='UserConfigEntry',
   full_name='proto.FunctionDetails.UserConfigEntry',
@@ -191,8 +155,8 @@ _FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=655,
-  serialized_end=704,
+  serialized_start=449,
+  serialized_end=498,
 )
 
 _FUNCTIONDETAILS = _descriptor.Descriptor(
@@ -231,85 +195,71 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='customSerdeInputs', 
full_name='proto.FunctionDetails.customSerdeInputs', index=4,
-      number=5, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
+      name='outputSerdeClassName', 
full_name='proto.FunctionDetails.outputSerdeClassName', index=4,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='outputSerdeClassName', 
full_name='proto.FunctionDetails.outputSerdeClassName', index=5,
+      name='output', full_name='proto.FunctionDetails.output', index=5,
       number=6, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='output', full_name='proto.FunctionDetails.output', index=6,
+      name='logTopic', full_name='proto.FunctionDetails.logTopic', index=6,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='logTopic', full_name='proto.FunctionDetails.logTopic', index=7,
-      number=8, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='processingGuarantees', 
full_name='proto.FunctionDetails.processingGuarantees', index=8,
-      number=9, type=14, cpp_type=8, label=1,
+      name='processingGuarantees', 
full_name='proto.FunctionDetails.processingGuarantees', index=7,
+      number=8, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='userConfig', full_name='proto.FunctionDetails.userConfig', index=9,
-      number=10, type=11, cpp_type=10, label=3,
+      name='userConfig', full_name='proto.FunctionDetails.userConfig', index=8,
+      number=9, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='subscriptionType', 
full_name='proto.FunctionDetails.subscriptionType', index=10,
-      number=11, type=14, cpp_type=8, label=1,
+      name='runtime', full_name='proto.FunctionDetails.runtime', index=9,
+      number=10, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='runtime', full_name='proto.FunctionDetails.runtime', index=11,
-      number=12, type=14, cpp_type=8, label=1,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='autoAck', full_name='proto.FunctionDetails.autoAck', index=12,
-      number=13, type=8, cpp_type=7, label=1,
+      name='autoAck', full_name='proto.FunctionDetails.autoAck', index=10,
+      number=11, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='inputs', full_name='proto.FunctionDetails.inputs', index=13,
-      number=14, type=9, cpp_type=9, label=3,
-      has_default_value=False, default_value=[],
+      name='parallelism', full_name='proto.FunctionDetails.parallelism', 
index=11,
+      number=12, type=5, cpp_type=1, label=1,
+      has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='parallelism', full_name='proto.FunctionDetails.parallelism', 
index=14,
-      number=15, type=5, cpp_type=1, label=1,
-      has_default_value=False, default_value=0,
+      name='source', full_name='proto.FunctionDetails.source', index=12,
+      number=13, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='source', full_name='proto.FunctionDetails.source', index=15,
-      number=16, type=11, cpp_type=10, label=1,
+      name='sink', full_name='proto.FunctionDetails.sink', index=13,
+      number=14, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -317,10 +267,8 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   ],
   extensions=[
   ],
-  nested_types=[_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY, 
_FUNCTIONDETAILS_USERCONFIGENTRY, ],
+  nested_types=[_FUNCTIONDETAILS_USERCONFIGENTRY, ],
   enum_types=[
-    _FUNCTIONDETAILS_PROCESSINGGUARANTEES,
-    _FUNCTIONDETAILS_SUBSCRIPTIONTYPE,
     _FUNCTIONDETAILS_RUNTIME,
   ],
   options=None,
@@ -330,26 +278,26 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=26,
-  serialized_end=879,
+  serialized_end=531,
 )
 
 
-_CONNECTORDETAILS_CONFIGSENTRY = _descriptor.Descriptor(
-  name='ConfigsEntry',
-  full_name='proto.ConnectorDetails.ConfigsEntry',
+_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
+  name='TopicsToSerDeClassNameEntry',
+  full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='key', full_name='proto.ConnectorDetails.ConfigsEntry.key', index=0,
+      name='key', 
full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.key', index=0,
       number=1, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='value', full_name='proto.ConnectorDetails.ConfigsEntry.value', 
index=1,
+      name='value', 
full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
@@ -367,26 +315,40 @@ _CONNECTORDETAILS_CONFIGSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=976,
-  serialized_end=1022,
+  serialized_start=714,
+  serialized_end=775,
 )
 
-_CONNECTORDETAILS = _descriptor.Descriptor(
-  name='ConnectorDetails',
-  full_name='proto.ConnectorDetails',
+_SOURCESPEC = _descriptor.Descriptor(
+  name='SourceSpec',
+  full_name='proto.SourceSpec',
   filename=None,
   file=DESCRIPTOR,
   containing_type=None,
   fields=[
     _descriptor.FieldDescriptor(
-      name='className', full_name='proto.ConnectorDetails.className', index=0,
+      name='className', full_name='proto.SourceSpec.className', index=0,
       number=1, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='configs', full_name='proto.ConnectorDetails.configs', index=1,
+      name='configs', full_name='proto.SourceSpec.configs', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', 
index=2,
+      number=3, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='topicsToSerDeClassName', 
full_name='proto.SourceSpec.topicsToSerDeClassName', index=3,
       number=4, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -395,7 +357,7 @@ _CONNECTORDETAILS = _descriptor.Descriptor(
   ],
   extensions=[
   ],
-  nested_types=[_CONNECTORDETAILS_CONFIGSENTRY, ],
+  nested_types=[_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY, ],
   enum_types=[
   ],
   options=None,
@@ -404,8 +366,46 @@ _CONNECTORDETAILS = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=882,
-  serialized_end=1022,
+  serialized_start=534,
+  serialized_end=775,
+)
+
+
+_SINKSPEC = _descriptor.Descriptor(
+  name='SinkSpec',
+  full_name='proto.SinkSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='className', full_name='proto.SinkSpec.className', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='configs', full_name='proto.SinkSpec.configs', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=777,
+  serialized_end=823,
 )
 
 
@@ -435,8 +435,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1024,
-  serialized_end=1070,
+  serialized_start=825,
+  serialized_end=871,
 )
 
 
@@ -487,8 +487,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1073,
-  serialized_end=1234,
+  serialized_start=874,
+  serialized_end=1035,
 )
 
 
@@ -525,8 +525,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1236,
-  serialized_end=1317,
+  serialized_start=1037,
+  serialized_end=1118,
 )
 
 
@@ -563,44 +563,37 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1319,
-  serialized_end=1384,
+  serialized_start=1120,
+  serialized_end=1185,
 )
 
-_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS
-_FUNCTIONDETAILS.fields_by_name['customSerdeInputs'].message_type = 
_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY
-_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_FUNCTIONDETAILS_PROCESSINGGUARANTEES
+_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = 
_PROCESSINGGUARANTEES
 _FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = 
_FUNCTIONDETAILS_USERCONFIGENTRY
-_FUNCTIONDETAILS.fields_by_name['subscriptionType'].enum_type = 
_FUNCTIONDETAILS_SUBSCRIPTIONTYPE
 _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME
-_FUNCTIONDETAILS.fields_by_name['source'].message_type = _CONNECTORDETAILS
-_FUNCTIONDETAILS_PROCESSINGGUARANTEES.containing_type = _FUNCTIONDETAILS
-_FUNCTIONDETAILS_SUBSCRIPTIONTYPE.containing_type = _FUNCTIONDETAILS
+_FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
+_FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
-_CONNECTORDETAILS_CONFIGSENTRY.containing_type = _CONNECTORDETAILS
-_CONNECTORDETAILS.fields_by_name['configs'].message_type = 
_CONNECTORDETAILS_CONFIGSENTRY
+_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
+_SOURCESPEC.fields_by_name['subscriptionType'].enum_type = _SUBSCRIPTIONTYPE
+_SOURCESPEC.fields_by_name['topicsToSerDeClassName'].message_type = 
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY
 _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = 
_FUNCTIONDETAILS
 _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = 
_PACKAGELOCATIONMETADATA
 _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
 _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
 DESCRIPTOR.message_types_by_name['FunctionDetails'] = _FUNCTIONDETAILS
-DESCRIPTOR.message_types_by_name['ConnectorDetails'] = _CONNECTORDETAILS
+DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
+DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
 DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = 
_PACKAGELOCATIONMETADATA
 DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA
 DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
 DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT
+DESCRIPTOR.enum_types_by_name['ProcessingGuarantees'] = _PROCESSINGGUARANTEES
+DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
 FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', 
(_message.Message,), dict(
 
-  CustomSerdeInputsEntry = 
_reflection.GeneratedProtocolMessageType('CustomSerdeInputsEntry', 
(_message.Message,), dict(
-    DESCRIPTOR = _FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY,
-    __module__ = 'Function_pb2'
-    # 
@@protoc_insertion_point(class_scope:proto.FunctionDetails.CustomSerdeInputsEntry)
-    ))
-  ,
-
   UserConfigEntry = 
_reflection.GeneratedProtocolMessageType('UserConfigEntry', 
(_message.Message,), dict(
     DESCRIPTOR = _FUNCTIONDETAILS_USERCONFIGENTRY,
     __module__ = 'Function_pb2'
@@ -612,23 +605,29 @@ FunctionDetails = 
_reflection.GeneratedProtocolMessageType('FunctionDetails', (_
   # @@protoc_insertion_point(class_scope:proto.FunctionDetails)
   ))
 _sym_db.RegisterMessage(FunctionDetails)
-_sym_db.RegisterMessage(FunctionDetails.CustomSerdeInputsEntry)
 _sym_db.RegisterMessage(FunctionDetails.UserConfigEntry)
 
-ConnectorDetails = 
_reflection.GeneratedProtocolMessageType('ConnectorDetails', 
(_message.Message,), dict(
+SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', 
(_message.Message,), dict(
 
-  ConfigsEntry = _reflection.GeneratedProtocolMessageType('ConfigsEntry', 
(_message.Message,), dict(
-    DESCRIPTOR = _CONNECTORDETAILS_CONFIGSENTRY,
+  TopicsToSerDeClassNameEntry = 
_reflection.GeneratedProtocolMessageType('TopicsToSerDeClassNameEntry', 
(_message.Message,), dict(
+    DESCRIPTOR = _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY,
     __module__ = 'Function_pb2'
-    # @@protoc_insertion_point(class_scope:proto.ConnectorDetails.ConfigsEntry)
+    # 
@@protoc_insertion_point(class_scope:proto.SourceSpec.TopicsToSerDeClassNameEntry)
     ))
   ,
-  DESCRIPTOR = _CONNECTORDETAILS,
+  DESCRIPTOR = _SOURCESPEC,
+  __module__ = 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.SourceSpec)
+  ))
+_sym_db.RegisterMessage(SourceSpec)
+_sym_db.RegisterMessage(SourceSpec.TopicsToSerDeClassNameEntry)
+
+SinkSpec = _reflection.GeneratedProtocolMessageType('SinkSpec', 
(_message.Message,), dict(
+  DESCRIPTOR = _SINKSPEC,
   __module__ = 'Function_pb2'
-  # @@protoc_insertion_point(class_scope:proto.ConnectorDetails)
+  # @@protoc_insertion_point(class_scope:proto.SinkSpec)
   ))
-_sym_db.RegisterMessage(ConnectorDetails)
-_sym_db.RegisterMessage(ConnectorDetails.ConfigsEntry)
+_sym_db.RegisterMessage(SinkSpec)
 
 PackageLocationMetaData = 
_reflection.GeneratedProtocolMessageType('PackageLocationMetaData', 
(_message.Message,), dict(
   DESCRIPTOR = _PACKAGELOCATIONMETADATA,
@@ -661,10 +660,8 @@ _sym_db.RegisterMessage(Assignment)
 
 DESCRIPTOR.has_options = True
 DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), 
_b('\n!org.apache.pulsar.functions.protoB\010Function'))
-_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY.has_options = True
-_FUNCTIONDETAILS_CUSTOMSERDEINPUTSENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 _FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True
 _FUNCTIONDETAILS_USERCONFIGENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_CONNECTORDETAILS_CONFIGSENTRY.has_options = True
-_CONNECTORDETAILS_CONFIGSENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True
+_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 53ca3f0..947ff23 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -110,8 +110,8 @@ class PythonInstance(object):
     self.function_purefunction = None
     self.producer = None
     self.exeuction_thread = None
-    self.atmost_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.FunctionDetails.ProcessingGuarantees.Value('ATMOST_ONCE')
-    self.atleast_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.FunctionDetails.ProcessingGuarantees.Value('ATLEAST_ONCE')
+    self.atmost_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.ProcessingGuarantees.Value('ATMOST_ONCE')
+    self.atleast_once = 
self.instance_config.function_details.processingGuarantees == 
Function_pb2.ProcessingGuarantees.Value('ATLEAST_ONCE')
     self.auto_ack = self.instance_config.function_details.autoAck
     self.contextimpl = None
     self.total_stats = Stats()
@@ -120,27 +120,17 @@ class PythonInstance(object):
   def run(self):
     # Setup consumers and input deserializers
     mode = pulsar._pulsar.ConsumerType.Shared
-    if self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('EXCLUSIVE'):
-      mode = pulsar._pulsar.ConsumerType.Exclusive
-    elif self.instance_config.function_details.subscriptionType == 
Function_pb2.FunctionDetails.SubscriptionType.Value('FAILOVER'):
+    if self.instance_config.function_details.source.subscriptionType == 
Function_pb2.SubscriptionType.Value("FAILOVER"):
       mode = pulsar._pulsar.ConsumerType.Failover
 
     subscription_name = str(self.instance_config.function_details.tenant) + 
"/" + \
                         str(self.instance_config.function_details.namespace) + 
"/" + \
                         str(self.instance_config.function_details.name)
-    for topic, serde in 
self.instance_config.function_details.customSerdeInputs.items():
-      serde_kclass = util.import_class(os.path.dirname(self.user_code), serde)
-      self.input_serdes[topic] = serde_kclass()
-      Log.info("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
-      self.consumers[topic] = self.pulsar_client.subscribe(
-        str(topic), subscription_name,
-        consumer_type=mode,
-        message_listener=partial(self.message_listener, topic, 
self.input_serdes[topic])
-      )
-
-    for topic in self.instance_config.function_details.inputs:
-      global DEFAULT_SERIALIZER
-      serde_kclass = util.import_class(os.path.dirname(self.user_code), 
DEFAULT_SERIALIZER)
+    for topic, serde in 
self.instance_config.function_details.source.topicsToSerDeClassName.items():
+      if not serde:
+        serde_kclass = util.import_class(os.path.dirname(self.user_code), 
DEFAULT_SERIALIZER)
+      else:
+        serde_kclass = util.import_class(os.path.dirname(self.user_code), 
serde)
       self.input_serdes[topic] = serde_kclass()
       Log.info("Setting up consumer for topic %s with subname %s" % (topic, 
subscription_name))
       self.consumers[topic] = self.pulsar_client.subscribe(
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 8a29dc5..59ebce6 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -58,16 +58,14 @@ def main():
   parser.add_argument('--name', required=True, help='Function Name')
   parser.add_argument('--tenant', required=True, help='Tenant Name')
   parser.add_argument('--namespace', required=True, help='Namespace name')
-  parser.add_argument('--custom_serde_input_topics', required=False, 
help='Input Topics Requiring Custom Deserialization')
-  parser.add_argument('--custom_serde_classnames', required=False, help='Input 
Serde Classnames')
-  parser.add_argument('--input_topics', required=False, help='Input topics 
with default serde')
+  parser.add_argument('--source_topics_serde_classname', required=True, 
help='A mapping of Input topics to SerDe')
   parser.add_argument('--output_topic', required=False, help='Output Topic')
   parser.add_argument('--output_serde_classname', required=False, help='Output 
Serde Classnames')
   parser.add_argument('--instance_id', required=True, help='Instance Id')
   parser.add_argument('--function_id', required=True, help='Function Id')
   parser.add_argument('--function_version', required=True, help='Function 
Version')
   parser.add_argument('--processing_guarantees', required=True, 
help='Processing Guarantees')
-  parser.add_argument('--subscription_type', required=True, help='Subscription 
Type')
+  parser.add_argument('--source_subscription_type', required=True, 
help='Subscription Type')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar 
Service Url')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
@@ -91,26 +89,25 @@ def main():
   function_details.namespace = args.namespace
   function_details.name = args.name
   function_details.className = args.function_classname
-  if args.custom_serde_input_topics is None and args.input_topics is None:
-    Log.critical("Atleast one input topic must be present")
+
+  sourceSpec = Function_pb2.SourceSpec()
+  sourceSpec.subscriptionType = 
Function_pb2.SubscriptionType.Value(args.source_subscription_type)
+  try:
+    source_topics_serde_classname_dict = 
json.loads(args.source_topics_serde_classname)
+  except ValueError:
+    log.critical("Cannot decode source_topics_serde_classname.  This argument 
must be specifed as a JSON")
     sys.exit(1)
-  if args.custom_serde_input_topics is not None and 
args.custom_serde_classnames is not None:
-    input_topics = args.custom_serde_input_topics.split(",")
-    input_serde = args.custom_serde_classnames.split(",")
-    if len(input_topics) != len(input_serde):
-      Log.critical("CustomSerde InputTopcis and Serde classnames should match")
-      sys.exit(1)
-    for i in xrange(len(input_topics)):
-      function_details.customSerdeInputs[input_topics[i]] = input_serde[i]
-  if args.input_topics is not None:
-    for topic in args.input_topics.split(","):
-      function_details.inputs.append(topic)
+  if not source_topics_serde_classname_dict:
+    log.critical("source_topics_serde_classname cannot be empty")
+  for topics, serde_classname in source_topics_serde_classname_dict.items():
+    sourceSpec.topicsToSerDeClassName[topics] = serde_classname
+  function_details.source.MergeFrom(sourceSpec)
+
   if args.output_topic != None and len(args.output_topic) != 0:
     function_details.output = args.output_topic
   if args.output_serde_classname != None and len(args.output_serde_classname) 
!= 0:
     function_details.outputSerdeClassName = args.output_serde_classname
-  function_details.processingGuarantees = 
Function_pb2.FunctionDetails.ProcessingGuarantees.Value(args.processing_guarantees)
-  function_details.subscriptionType = 
Function_pb2.FunctionDetails.SubscriptionType.Value(args.subscription_type)
+  function_details.processingGuarantees = 
Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
   if args.auto_ack == "true":
     function_details.autoAck = True
   else:
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index a1672ba..898d777 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -49,11 +49,6 @@ public class JavaInstanceRunnableTest {
 
     private static InstanceConfig createInstanceConfig(boolean addCustom, 
String outputSerde) {
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
-        if (!addCustom) {
-            functionDetailsBuilder.addInputs("TEST");
-        } else {
-            functionDetailsBuilder.putCustomSerdeInputs("TEST", 
IntegerSerDe.class.getName());
-        }
         if (outputSerde != null) {
             functionDetailsBuilder.setOutputSerdeClassName(outputSerde);
         }
@@ -117,26 +112,6 @@ public class JavaInstanceRunnableTest {
     }
 
     /**
-     * Verify that JavaInstance does not support functions that take Void type 
as input
-     */
-    @Test
-    public void testVoidInputClasses() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, 
DefaultSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            VoidInputHandler pulsarFunction = new VoidInputHandler();
-            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-            Class<?>[] typeArgs = 
TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-            assertFalse(true);
-        } catch (InvocationTargetException ex) {
-            // Good
-        } catch (Exception ex) {
-            assertFalse(true);
-        }
-    }
-
-    /**
      * Verify that JavaInstance does support functions that output Void type
      */
     @Test
@@ -154,26 +129,6 @@ public class JavaInstanceRunnableTest {
     }
 
     /**
-     * Verify that function input type should be consistent with input serde 
type.
-     */
-    @Test
-    public void testInconsistentInputType() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(true, 
DefaultSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = 
Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> 
input + "-lambda";
-            Class<?>[] typeArgs = 
TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-            fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
-        } catch (InvocationTargetException ex) {
-            assertTrue(ex.getCause().getMessage().startsWith("Inconsistent 
types found between function input type and input serde type:"));
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-    /**
      * Verify that Default Serializer works fine.
      */
     @Test
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index c0aae7c..e83a706 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -26,15 +26,13 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.source.PulsarSource;
 import org.testng.annotations.Test;
 
-import java.util.HashMap;
-
 public class JavaInstanceTest {
 
     private static InstanceConfig createInstanceConfig() {
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
-        functionDetailsBuilder.addInputs("TEST");
         
functionDetailsBuilder.setOutputSerdeClassName(DefaultSerDe.class.getName());
         InstanceConfig instanceConfig = new InstanceConfig();
         instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
new file mode 100644
index 0000000..558517a
--- /dev/null
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+@Slf4j
+public class PulsarSourceTest {
+
+    private static final String SUBSCRIPTION_NAME = 
"test/test-namespace/example";
+    private static Map<String, String> topicSerdeClassNameMap = new 
HashMap<>();
+    static {
+        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", 
DefaultSerDe.class.getName());
+    }
+
+    public static class TestSerDe implements SerDe<String> {
+
+        @Override
+        public String deserialize(byte[] input) {
+            return null;
+        }
+
+        @Override
+        public byte[] serialize(String input) {
+            return new byte[0];
+        }
+    }
+
+    /**
+     * Verify that JavaInstance does not support functions that take Void type 
as input
+     */
+
+    private static PulsarClient getPulsarClient() throws PulsarClientException 
{
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+        ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
+        doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
+        
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
+        
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
+        doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), 
any());
+        Consumer consumer = mock(Consumer.class);
+        doReturn(consumer).when(consumerBuilder).subscribe();
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+        return pulsarClient;
+    }
+
+    private static PulsarConfig getPulsarConfigs() {
+        PulsarConfig pulsarConfig = new PulsarConfig();
+        
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        
pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER);
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        pulsarConfig.setTypeClassName(String.class.getName());
+        return pulsarConfig;
+    }
+
+    @Test
+    public void testVoidInputClasses() throws IOException {
+        PulsarConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(Void.class.getName());
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+
+        try {
+            pulsarSource.open(new HashMap<>());
+            assertFalse(true);
+        } catch (RuntimeException ex) {
+            log.error("RuntimeException: {}", ex, ex);
+            assertEquals(ex.getMessage(), "Input type of Pulsar Function 
cannot be Void");
+        } catch (Exception ex) {
+            log.error("Exception: {}", ex, ex);
+            assertFalse(true);
+        }
+    }
+
+    /**
+     * Verify that function input type should be consistent with input serde 
type.
+     */
+    @Test
+    public void testInconsistentInputType() throws IOException {
+        PulsarConfig pulsarConfig = getPulsarConfigs();
+        // set type to be inconsistent to that of SerDe
+        pulsarConfig.setTypeClassName(Integer.class.getName());
+        Map<String, String> topicSerdeClassNameMap = new HashMap<>();
+        
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", 
TestSerDe.class.getName());
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
+        try {
+            pulsarSource.open(new HashMap<>());
+            fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
+        } catch (RuntimeException ex) {
+            log.error("RuntimeException: {}", ex, ex);
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and input serde type:"));
+        } catch (Exception ex) {
+            log.error("Exception: {}", ex, ex);
+            assertTrue(false);
+        }
+    }
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index 30dd9b5..063b6aa 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -23,17 +23,18 @@ package proto;
 option java_package = "org.apache.pulsar.functions.proto";
 option java_outer_classname = "Function";
 
+enum ProcessingGuarantees {
+    ATLEAST_ONCE = 0; // [default value]
+    ATMOST_ONCE = 1;
+    EFFECTIVELY_ONCE = 2;
+}
+
+enum SubscriptionType {
+    SHARED = 0;
+    FAILOVER = 1;
+}
+
 message FunctionDetails {
-    enum ProcessingGuarantees {
-        ATLEAST_ONCE = 0; // [default value]
-        ATMOST_ONCE = 1;
-        EFFECTIVELY_ONCE = 2;
-    }
-    enum SubscriptionType {
-        SHARED = 0;
-        EXCLUSIVE = 1;
-        FAILOVER = 2;
-    }
     enum Runtime {
         JAVA = 0;
         PYTHON = 1;
@@ -42,24 +43,25 @@ message FunctionDetails {
     string namespace = 2;
     string name = 3;
     string className = 4;
-    // map from input topic name to serde
-    map<string, string> customSerdeInputs = 5;
-    string outputSerdeClassName = 6;
-    string output = 7;
-    string logTopic = 8;
-    ProcessingGuarantees processingGuarantees = 9;
-    map<string,string> userConfig = 10;
-    SubscriptionType subscriptionType = 11;
-    Runtime runtime = 12;
-    bool autoAck = 13;
-    repeated string inputs = 14;
-    int32 parallelism = 15;
-    ConnectorDetails source = 16;
+    string outputSerdeClassName = 5;
+    string output = 6;
+    string logTopic = 7;
+    ProcessingGuarantees processingGuarantees = 8;
+    map<string,string> userConfig = 9;
+    Runtime runtime = 10;
+    bool autoAck = 11;
+    int32 parallelism = 12;
+    SourceSpec source = 13;
 }
 
-message ConnectorDetails {
+message SourceSpec {
     string className = 1;
-    map<string, string> configs = 4;
+    // map in json format
+    string configs = 2;
+
+    // configs used only when source feeds into functions
+    SubscriptionType subscriptionType = 3;
+    map<string,string> topicsToSerDeClassName = 4;
 }
 
 message PackageLocationMetaData {
diff --git 
a/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java
 
b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java
index c5f7dfa..31551ef 100644
--- 
a/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java
+++ 
b/pulsar-functions/proto/src/test/java/org/apache/pulsar/functions/proto/FunctionDetailsTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.proto;
 import static org.testng.Assert.assertEquals;
 
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 import org.testng.annotations.Test;
 
 /**
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index cf5238f..ff6098c 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -30,7 +30,9 @@ import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function.ConnectorDetails;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
@@ -61,13 +63,6 @@ public class JavaInstanceMain {
     @Parameter(names = "--output_topic", description = "Output Topic Name\n")
     protected String outputTopicName;
 
-    @Parameter(names = "--custom_serde_input_topics", description = "Input 
Topics that need custom deserialization\n", required = false)
-    protected String customSerdeInputTopics;
-    @Parameter(names = "--custom_serde_classnames", description = "Input 
SerDe\n", required = false)
-    protected String customSerdeClassnames;
-    @Parameter(names = "--input_topics", description = "Input Topics\n", 
required = false)
-    protected String defaultSerdeInputTopics;
-
     @Parameter(names = "--output_serde_classname", description = "Output 
SerDe\n")
     protected String outputSerdeClassName;
 
@@ -75,7 +70,7 @@ public class JavaInstanceMain {
     protected String logTopic;
 
     @Parameter(names = "--processing_guarantees", description = "Processing 
Guarantees\n", required = true)
-    protected FunctionDetails.ProcessingGuarantees processingGuarantees;
+    protected ProcessingGuarantees processingGuarantees;
 
     @Parameter(names = "--instance_id", description = "Instance Id\n", 
required = true)
     protected String instanceId;
@@ -104,15 +99,18 @@ public class JavaInstanceMain {
     @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
     protected String autoAck = "true";
 
-    @Parameter(names = "--subscription_type", description = "What subscription 
type to use")
-    protected FunctionDetails.SubscriptionType subscriptionType;
-
-    @Parameter(names = "--source_classname", description = "The source 
classname")
+    @Parameter(names = "--source_classname", description = "The source 
classname", required = true)
     protected String sourceClassname;
 
-    @Parameter(names = "--source_configs", description = "The source 
classname")
+    @Parameter(names = "--source_configs", description = "The source configs")
     protected String sourceConfigs;
 
+    @Parameter(names = "--source_subscription_type", description = "The source 
subscription type", required = true)
+    protected String sourceSubscriptionType;
+
+    @Parameter(names = "--source_topics_serde_classname", description = "A map 
of topics to SerDe for the source", required = true)
+    protected String sourceTopicsSerdeClassName;
+
 
     private Server server;
 
@@ -130,22 +128,7 @@ public class JavaInstanceMain {
         functionDetailsBuilder.setNamespace(namespace);
         functionDetailsBuilder.setName(functionName);
         functionDetailsBuilder.setClassName(className);
-        if (defaultSerdeInputTopics != null) {
-            String[] inputTopics = defaultSerdeInputTopics.split(",");
-            for (String inputTopic : inputTopics) {
-                functionDetailsBuilder.addInputs(inputTopic);
-            }
-        }
-        if (customSerdeInputTopics != null && customSerdeClassnames != null) {
-            String[] inputTopics = customSerdeInputTopics.split(",");
-            String[] inputSerdeClassNames = customSerdeClassnames.split(",");
-            if (inputTopics.length != inputSerdeClassNames.length) {
-                throw new RuntimeException("Error specifying inputs");
-            }
-            for (int i = 0; i < inputTopics.length; ++i) {
-                functionDetailsBuilder.putCustomSerdeInputs(inputTopics[i], 
inputSerdeClassNames[i]);
-            }
-        }
+
         if (outputSerdeClassName != null) {
             
functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName);
         }
@@ -161,20 +144,21 @@ public class JavaInstanceMain {
         } else {
             functionDetailsBuilder.setAutoAck(false);
         }
-        functionDetailsBuilder.setSubscriptionType(subscriptionType);
         if (userConfig != null && !userConfig.isEmpty()) {
             Type type = new TypeToken<Map<String, String>>(){}.getType();
             Map<String, String> userConfigMap = new 
Gson().fromJson(userConfig, type);
             functionDetailsBuilder.putAllUserConfig(userConfigMap);
         }
 
-        ConnectorDetails.Builder sourceDetailsBuilder = 
ConnectorDetails.newBuilder();
+        SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
         sourceDetailsBuilder.setClassName(sourceClassname);
-        if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
-            Type type = new TypeToken<Map<String, String>>(){}.getType();
-            Map<String, String> sourceConfigMap = new 
Gson().fromJson(sourceConfigs, type);
-            sourceDetailsBuilder.putAllConfigs(sourceConfigMap);
+        if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
+            sourceDetailsBuilder.setConfigs(sourceConfigs);
         }
+        
sourceDetailsBuilder.setSubscriptionType(Function.SubscriptionType.valueOf(sourceSubscriptionType));
+
+        sourceDetailsBuilder.putAllTopicsToSerDeClassName(new 
Gson().fromJson(sourceTopicsSerdeClassName, Map.class));
+
         functionDetailsBuilder.setSource(sourceDetailsBuilder);
 
         FunctionDetails functionDetails = functionDetailsBuilder.build();
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 20407f8..bc49899 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -80,7 +80,7 @@ class ProcessRuntime implements Runtime {
             args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
             args.add("-Dpulsar.log.dir=" + logDirectory);
             args.add("-Dpulsar.log.file=" + 
instanceConfig.getFunctionDetails().getName());
-            args.add("org.apache.pulsar.functions.runtime.JavaInstanceMain");
+            args.add(JavaInstanceMain.class.getName());
             args.add("--jar");
             args.add(codeFile);
         } else if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.PYTHON) {
@@ -107,45 +107,11 @@ class ProcessRuntime implements Runtime {
         args.add(instanceConfig.getFunctionDetails().getName());
         args.add("--function_classname");
         args.add(instanceConfig.getFunctionDetails().getClassName());
-        args.add("--subscription_type");
-        
args.add(instanceConfig.getFunctionDetails().getSubscriptionType().toString());
         if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
-            !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
+                !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
             args.add("--log_topic");
             args.add(instanceConfig.getFunctionDetails().getLogTopic());
         }
-        if (instanceConfig.getFunctionDetails().getCustomSerdeInputsCount() > 
0) {
-            String inputTopicString = "";
-            String inputSerdeClassNameString = "";
-            for (Map.Entry<String, String> entry : 
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().entrySet()) {
-                if (inputTopicString.isEmpty()) {
-                    inputTopicString = entry.getKey();
-                } else {
-                    inputTopicString = inputTopicString + "," + entry.getKey();
-                }
-                if (inputSerdeClassNameString.isEmpty()) {
-                    inputSerdeClassNameString = entry.getValue();
-                } else {
-                    inputSerdeClassNameString = inputSerdeClassNameString + 
"," + entry.getValue();
-                }
-            }
-            args.add("--custom_serde_input_topics");
-            args.add(inputTopicString);
-            args.add("--custom_serde_classnames");
-            args.add(inputSerdeClassNameString);
-        }
-        if (instanceConfig.getFunctionDetails().getInputsCount() > 0) {
-            String inputTopicString = "";
-            for (String topicName : 
instanceConfig.getFunctionDetails().getInputsList()) {
-                if (inputTopicString.isEmpty()) {
-                    inputTopicString = topicName;
-                } else {
-                    inputTopicString = inputTopicString + "," + topicName;
-                }
-            }
-            args.add("--input_topics");
-            args.add(inputTopicString);
-        }
         args.add("--auto_ack");
         if (instanceConfig.getFunctionDetails().getAutoAck()) {
             args.add("true");
@@ -176,16 +142,23 @@ class ProcessRuntime implements Runtime {
         instancePort = findAvailablePort();
         args.add("--port");
         args.add(String.valueOf(instancePort));
+
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
-            args.add("--source_classname");
-            
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
-            Map<String, String> sourceConfigs = 
instanceConfig.getFunctionDetails().getSource().getConfigsMap();
+            if 
(!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) {
+                args.add("--source_classname");
+                
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
+            }
+            String sourceConfigs = 
instanceConfig.getFunctionDetails().getSource().getConfigs();
             if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
-                args.add("--source_config");
-                args.add(new Gson().toJson(sourceConfigs));
+                args.add("--source_configs");
+                args.add(sourceConfigs);
             }
         }
+        args.add("--source_subscription_type");
+        
args.add(instanceConfig.getFunctionDetails().getSource().getSubscriptionType().toString());
 
+        args.add("--source_topics_serde_classname");
+        args.add(new 
Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap()));
         return args;
     }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 189067a..96fb2c2 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -19,19 +19,20 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.runtime.ProcessRuntime;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.runtime.ThreadRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
 
@@ -46,6 +47,10 @@ public class ProcessRuntimeTest {
     private static final String TEST_TENANT = "test-function-tenant";
     private static final String TEST_NAMESPACE = "test-function-namespace";
     private static final String TEST_NAME = "test-function-container";
+    private static final Map<String, String> topicsToSerDeClassName = new 
HashMap<>();
+    static {
+        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
DefaultSerDe.class.getName());
+    }
 
     private final ProcessRuntimeFactory factory;
     private final String userJarFile;
@@ -76,12 +81,12 @@ public class ProcessRuntimeTest {
         functionDetailsBuilder.setNamespace(TEST_NAMESPACE);
         functionDetailsBuilder.setName(TEST_NAME);
         
functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
-        functionDetailsBuilder.addInputs(TEST_NAME + "-input1");
-        functionDetailsBuilder.addInputs(TEST_NAME + "-input2");
         functionDetailsBuilder.setOutput(TEST_NAME + "-output");
         
functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
         functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
-        functionDetailsBuilder.setSource(Function.ConnectorDetails.newBuilder()
+        functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
+                .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+                .putAllTopicsToSerDeClassName(topicsToSerDeClassName)
                 .setClassName("org.pulsar.pulsar.TestSource"));
         return functionDetailsBuilder.build();
     }
@@ -114,16 +119,16 @@ public class ProcessRuntimeTest {
                 + " --namespace " + config.getFunctionDetails().getNamespace()
                 + " --name " + config.getFunctionDetails().getName()
                 + " --function_classname " + 
config.getFunctionDetails().getClassName()
-                + " --subscription_type " + 
config.getFunctionDetails().getSubscriptionType()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
-                + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + 
"-input2"
                 + " --auto_ack false"
                 + " --output_topic " + config.getFunctionDetails().getOutput()
                 + " --output_serde_classname " + 
config.getFunctionDetails().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(42)
-                + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName();
+                + " --max_buffered_tuples 1024 --port " + args.get(38)
+                + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName()
+                + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
+                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName);
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
@@ -142,15 +147,15 @@ public class ProcessRuntimeTest {
                 + " --namespace " + config.getFunctionDetails().getNamespace()
                 + " --name " + config.getFunctionDetails().getName()
                 + " --function_classname " + 
config.getFunctionDetails().getClassName()
-                + " --subscription_type " + 
config.getFunctionDetails().getSubscriptionType()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
-                + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + 
"-input2"
                 + " --auto_ack false"
                 + " --output_topic " + config.getFunctionDetails().getOutput()
                 + " --output_serde_classname " + 
config.getFunctionDetails().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(41);
+                + " --max_buffered_tuples 1024 --port " + args.get(37)
+                + " --source_subscription_type " + 
config.getFunctionDetails().getSource().getSubscriptionType().name()
+                + " --source_topics_serde_classname " + new 
Gson().toJson(topicsToSerDeClassName);
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index b855104..cb7d0a2 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -44,8 +44,16 @@ public class FunctionConfig {
 
     public enum SubscriptionType {
         SHARED,
-        EXCLUSIVE,
-        FAILOVER
+        FAILOVER;
+
+        public org.apache.pulsar.client.api.SubscriptionType get() {
+            switch (this) {
+                case FAILOVER:
+                    return 
org.apache.pulsar.client.api.SubscriptionType.Failover;
+                default:
+                    return 
org.apache.pulsar.client.api.SubscriptionType.Shared;
+            }
+        }
     }
 
     public enum Runtime {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 6f98a42..04b6c70 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -54,6 +54,7 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
@@ -447,17 +448,18 @@ public class FunctionsImpl {
     }
 
     @POST
-    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Path("/{tenant}/{namespace}/{functionName}/{topic}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String 
namespace,
                                     final @PathParam("name") String 
functionName,
+                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String input,
                                     final @FormDataParam("dataStream") 
InputStream uploadedInputStream) {
         FunctionDetails functionDetails;
         // validate parameters
         try {
-            validateTriggerRequestParams(tenant, namespace, functionName, 
input, uploadedInputStream);
+            validateTriggerRequestParams(tenant, namespace, functionName, 
topic, input, uploadedInputStream);
         } catch (IllegalArgumentException e) {
             log.error("Invalid trigger function request @ /{}/{}/{}",
                     tenant, namespace, functionName, e);
@@ -476,11 +478,13 @@ public class FunctionsImpl {
         }
 
         FunctionMetaData functionMetaData = 
functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+
         String inputTopicToWrite;
-        if (functionMetaData.getFunctionDetails().getInputsList().size() > 0) {
-            inputTopicToWrite = 
functionMetaData.getFunctionDetails().getInputsList().get(0);
+        // only if the source is PulsarSource
+        if 
(functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName()))
 {
+            inputTopicToWrite =  topic;
         } else {
-            inputTopicToWrite = 
functionMetaData.getFunctionDetails().getCustomSerdeInputs().entrySet().iterator().next().getKey();
+            return Response.status(Status.BAD_REQUEST).build();
         }
         String outputTopic = functionMetaData.getFunctionDetails().getOutput();
         Reader reader = null;
@@ -667,8 +671,11 @@ public class FunctionsImpl {
             if (functionDetails.getClassName() == null || 
functionDetails.getClassName().isEmpty()) {
                 missingFields.add("ClassName");
             }
-            if (functionDetails.getInputsCount() == 0 && 
functionDetails.getCustomSerdeInputsCount() == 0) {
-                missingFields.add("Input");
+            if (!functionDetails.getSource().isInitialized()) {
+                missingFields.add("Source");
+            }
+            else if 
(functionDetails.getSource().getTopicsToSerDeClassNameMap().isEmpty()) {
+                missingFields.add("Source Topics Serde Map");
             }
             if (!missingFields.isEmpty()) {
                 String errorMessage = StringUtils.join(missingFields, ",");
@@ -688,6 +695,7 @@ public class FunctionsImpl {
     private void validateTriggerRequestParams(String tenant,
                                               String namespace,
                                               String functionName,
+                                              String topic,
                                               String input,
                                               InputStream uploadedInputStream) 
{
         if (tenant == null) {
@@ -699,6 +707,9 @@ public class FunctionsImpl {
         if (functionName == null) {
             throw new IllegalArgumentException("Function Name is not 
provided");
         }
+        if (topic == null) {
+            throw new IllegalArgumentException("Topic Name is not provided");
+        }
         if (uploadedInputStream == null && input == null) {
             throw new IllegalArgumentException("Trigger Data is not provided");
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 9ed70a4..12068d2 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -136,9 +136,10 @@ public class FunctionApiV2Resource extends 
FunctionApiResource {
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String 
namespace,
                                     final @PathParam("name") String 
functionName,
+                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String input,
                                     final @FormDataParam("dataStream") 
InputStream uploadedInputStream) {
-        return functions.triggerFunction(tenant, namespace, functionName, 
input, uploadedInputStream);
+        return functions.triggerFunction(tenant, namespace, functionName, 
topic, input, uploadedInputStream);
     }
 
     @POST
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 5ae9e99..68be764 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -33,7 +33,9 @@ import com.google.gson.Gson;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -47,6 +49,9 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -87,10 +92,13 @@ public class FunctionApiV2ResourceTest {
     private static final String namespace = "test-namespace";
     private static final String function = "test-function";
     private static final String outputTopic = "test-output-topic";
-    private static final String inputTopic = "test-input-topic";
-    private static final String inputSerdeClassName = 
DefaultSerDe.class.getName();
     private static final String outputSerdeClassName = 
DefaultSerDe.class.getName();
     private static final String className = TestFunction.class.getName();
+    private SubscriptionType subscriptionType = SubscriptionType.FAILOVER;
+    private static final Map<String, String> topicsToSerDeClassName = new 
HashMap<>();
+    static {
+        
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", 
DefaultSerDe.class.getName());
+    }
     private static final int parallelism = 1;
 
     private WorkerService mockedWorkerService;
@@ -137,12 +145,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Tenant");
+                subscriptionType, topicsToSerDeClassName, "Tenant");
     }
 
     @Test
@@ -154,12 +160,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Namespace");
+                subscriptionType, topicsToSerDeClassName, "Namespace");
     }
 
     @Test
@@ -171,12 +175,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Function Name");
+                subscriptionType, topicsToSerDeClassName, "Function Name");
     }
 
     @Test
@@ -188,12 +190,10 @@ public class FunctionApiV2ResourceTest {
             null,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Function Package");
+                subscriptionType, topicsToSerDeClassName, "Function Package");
     }
 
     @Test
@@ -205,33 +205,14 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             null,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
-            className,
-            parallelism,
-            "Function Package");
-    }
-
-    @Test
-    public void testRegisterFunctionMissingInputTopic() throws IOException {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-            null,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Input");
+                subscriptionType, topicsToSerDeClassName, "Function Package");
     }
 
     @Test
-    public void testRegisterFunctionMissingInputSerde() throws IOException {
+    public void testRegisterFunctionMissingClassName() throws IOException {
         testRegisterFunctionMissingArguments(
             tenant,
             namespace,
@@ -239,33 +220,29 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
+                outputSerdeClassName,
             null,
-            outputSerdeClassName,
-            className,
             parallelism,
-            "Input");
+                subscriptionType, topicsToSerDeClassName, "ClassName");
     }
 
     @Test
-    public void testRegisterFunctionMissingClassName() throws IOException {
+    public void testRegisterFunctionMissingParallelism() throws IOException {
         testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
-            null,
-            parallelism,
-            "ClassName");
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                null,
+                subscriptionType, topicsToSerDeClassName, "parallelism");
     }
 
     @Test
-    public void testRegisterFunctionMissingParallelism() throws IOException {
+    public void testRegisterFunctionMissingTopicsToSerDeClassName() throws 
IOException {
         testRegisterFunctionMissingArguments(
                 tenant,
                 namespace,
@@ -273,28 +250,25 @@ public class FunctionApiV2ResourceTest {
                 mockedInputStream,
                 mockedFormData,
                 outputTopic,
-                inputTopic,
-                inputSerdeClassName,
                 outputSerdeClassName,
                 className,
-                null,
-                "parallelism");
+                parallelism,
+                subscriptionType, null, "Source Topics Serde Map");
     }
 
     private void testRegisterFunctionMissingArguments(
-        String tenant,
-        String namespace,
-        String function,
-        InputStream inputStream,
-        FormDataContentDisposition details,
-        String outputTopic,
-        String inputTopic,
-        String inputSerdeClassName,
-        String outputSerdeClassName,
-        String className,
-        Integer parallelism,
-        String missingFieldName
-    ) throws IOException {
+            String tenant,
+            String namespace,
+            String function,
+            InputStream inputStream,
+            FormDataContentDisposition details,
+            String outputTopic,
+            String outputSerdeClassName,
+            String className,
+            Integer parallelism,
+            SubscriptionType subscriptionType,
+            Map<String, String> topicToSerDeClassName,
+            String missingFieldName) throws IOException {
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         if (tenant != null) {
             functionDetailsBuilder.setTenant(tenant);
@@ -308,9 +282,6 @@ public class FunctionApiV2ResourceTest {
         if (outputTopic != null) {
             functionDetailsBuilder.setOutput(outputTopic);
         }
-        if (inputTopic != null && inputSerdeClassName != null) {
-            functionDetailsBuilder.putCustomSerdeInputs(inputTopic, 
inputSerdeClassName);
-        }
         if (outputSerdeClassName != null) {
             
functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName);
         }
@@ -320,6 +291,14 @@ public class FunctionApiV2ResourceTest {
         if (parallelism != null) {
             functionDetailsBuilder.setParallelism(parallelism);
         }
+        if (subscriptionType != null) {
+            functionDetailsBuilder.setSource(
+                    
SourceSpec.newBuilder().setSubscriptionType(subscriptionType));
+        }
+        if (topicToSerDeClassName != null) {
+            functionDetailsBuilder.setSource(
+                    
SourceSpec.newBuilder().putAllTopicsToSerDeClassName(topicToSerDeClassName));
+        }
 
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         Response response = resource.registerFunction(
@@ -341,10 +320,12 @@ public class FunctionApiV2ResourceTest {
     private Response registerDefaultFunction() throws IOException {
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, 
inputSerdeClassName)
+                .setOutput(outputTopic)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setClassName(className)
-                .setParallelism(parallelism).build();
+                .setParallelism(parallelism)
+                
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
+                        
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
         return resource.registerFunction(
             tenant,
             namespace,
@@ -457,12 +438,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Tenant");
+                subscriptionType, topicsToSerDeClassName, "Tenant");
     }
 
     @Test
@@ -474,12 +453,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Namespace");
+                subscriptionType, topicsToSerDeClassName, "Namespace");
     }
 
     @Test
@@ -491,12 +468,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Function Name");
+                subscriptionType, topicsToSerDeClassName, "Function Name");
     }
 
     @Test
@@ -508,12 +483,10 @@ public class FunctionApiV2ResourceTest {
             null,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Function Package");
+                subscriptionType, topicsToSerDeClassName, "Function Package");
     }
 
     @Test
@@ -525,46 +498,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             null,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
-            className,
-            parallelism,
-            "Function Package");
-    }
-
-    @Test
-    public void testUpdateFunctionMissingSourceTopic() throws IOException {
-        testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-            null,
-            inputSerdeClassName,
-            outputSerdeClassName,
-            className,
-            parallelism,
-            "Input");
-    }
-
-    @Test
-    public void testUpdateFunctionMissingInputSerde() throws IOException {
-        testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-            inputTopic,
-            null,
-            outputSerdeClassName,
+                outputSerdeClassName,
             className,
             parallelism,
-            "Input");
+                subscriptionType, topicsToSerDeClassName, "Function Package");
     }
 
     @Test
@@ -576,12 +513,10 @@ public class FunctionApiV2ResourceTest {
             mockedInputStream,
             mockedFormData,
             outputTopic,
-            inputTopic,
-            inputSerdeClassName,
-            outputSerdeClassName,
+                outputSerdeClassName,
             null,
             parallelism,
-            "ClassName");
+                subscriptionType, topicsToSerDeClassName, "ClassName");
     }
     @Test
     public void testUpdateFunctionMissingParallelism() throws IOException {
@@ -592,29 +527,40 @@ public class FunctionApiV2ResourceTest {
                 mockedInputStream,
                 mockedFormData,
                 outputTopic,
-                inputTopic,
-                inputSerdeClassName,
                 outputSerdeClassName,
                 className,
                 null,
-                "parallelism");
+                subscriptionType, topicsToSerDeClassName, "parallelism");
     }
 
+    @Test
+    public void testUpdateFunctionMissingTopicsToSerDeClassName() throws 
IOException {
+        testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                outputSerdeClassName,
+                className,
+                parallelism,
+                subscriptionType, null, "Source Topics Serde Map");
+    }
 
     private void testUpdateFunctionMissingArguments(
-        String tenant,
-        String namespace,
-        String function,
-        InputStream inputStream,
-        FormDataContentDisposition details,
-        String outputTopic,
-        String inputTopic,
-        String inputSerdeClassName,
-        String outputSerdeClassName,
-        String className,
-        Integer parallelism,
-        String missingFieldName
-    ) throws IOException {
+            String tenant,
+            String namespace,
+            String function,
+            InputStream inputStream,
+            FormDataContentDisposition details,
+            String outputTopic,
+            String outputSerdeClassName,
+            String className,
+            Integer parallelism,
+            SubscriptionType subscriptionType,
+            Map<String, String> topicToSerDeClassName,
+            String missingFieldName) throws IOException {
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
         if (tenant != null) {
             functionDetailsBuilder.setTenant(tenant);
@@ -628,9 +574,6 @@ public class FunctionApiV2ResourceTest {
         if (outputTopic != null) {
             functionDetailsBuilder.setOutput(outputTopic);
         }
-        if (inputTopic != null && inputSerdeClassName != null) {
-            functionDetailsBuilder.putCustomSerdeInputs(inputTopic, 
inputSerdeClassName);
-        }
         if (outputSerdeClassName != null) {
             
functionDetailsBuilder.setOutputSerdeClassName(outputSerdeClassName);
         }
@@ -640,6 +583,14 @@ public class FunctionApiV2ResourceTest {
         if (parallelism != null) {
             functionDetailsBuilder.setParallelism(parallelism);
         }
+        if (subscriptionType != null) {
+            functionDetailsBuilder.setSource(
+                    
SourceSpec.newBuilder().setSubscriptionType(this.subscriptionType));
+        }
+        if (topicToSerDeClassName != null) {
+            functionDetailsBuilder.setSource(
+                    
SourceSpec.newBuilder().putAllTopicsToSerDeClassName(topicToSerDeClassName));
+        }
 
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         Response response = resource.updateFunction(
@@ -661,10 +612,12 @@ public class FunctionApiV2ResourceTest {
     private Response updateDefaultFunction() throws IOException {
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, 
inputSerdeClassName)
+                .setOutput(outputTopic)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setClassName(className)
-                .setParallelism(parallelism).build();
+                .setParallelism(parallelism)
+                
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
+                        
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
         return resource.updateFunction(
             tenant,
             namespace,
@@ -935,14 +888,15 @@ public class FunctionApiV2ResourceTest {
 
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
                 .setClassName(className)
-                .putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setName(function)
                 .setNamespace(namespace)
-                
.setProcessingGuarantees(FunctionDetails.ProcessingGuarantees.ATMOST_ONCE)
+                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
                 .setOutput(outputTopic)
                 .setTenant(tenant)
-                .setParallelism(parallelism).build();
+                .setParallelism(parallelism)
+                
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
+                        
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
             .setCreateTime(System.currentTimeMillis())
             .setFunctionDetails(functionDetails)

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to