This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cfa83a  refactoring functions to use source interface (#1649)
4cfa83a is described below

commit 4cfa83aeeee27e13b258065c19c0fc1488255145
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Apr 26 13:37:30 2018 -0700

    refactoring functions to use source interface (#1649)
    
    * refactoring functions to use source interface
    
    * addressing comments
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   6 +
 .../org/apache/pulsar/connect/core/Source.java     |   8 +-
 pulsar-functions/instance/pom.xml                  |   6 +
 .../{InputMessage.java => InstanceUtils.java}      |  46 +++----
 .../pulsar/functions/instance/JavaInstance.java    |  11 +-
 .../functions/instance/JavaInstanceRunnable.java   | 139 ++++++++++-----------
 .../{InputMessage.java => PulsarConfig.java}       |  40 ++----
 .../{InputMessage.java => PulsarRecord.java}       |  45 ++++---
 .../pulsar/functions/instance/PulsarSource.java    | 120 ++++++++++++++++++
 .../instance/processors/AtLeastOnceProcessor.java  |  11 +-
 .../instance/processors/AtMostOnceProcessor.java   |  11 +-
 .../processors/EffectivelyOnceProcessor.java       |  43 +++----
 .../instance/processors/MessageProcessor.java      |  23 ++--
 .../instance/processors/MessageProcessorBase.java  |  97 +++++++-------
 .../functions/instance/JavaInstanceTest.java       |   3 +-
 .../proto/src/main/proto/Function.proto            |   6 +
 .../pulsar/functions/runtime/JavaInstanceMain.java |  18 +++
 .../pulsar/functions/runtime/ProcessRuntime.java   |   7 ++
 .../pulsar/functions/runtime/ThreadRuntime.java    |  11 +-
 .../functions/runtime/ProcessRuntimeTest.java      |  15 ++-
 .../functions/utils/FunctionDetailsUtils.java      |  11 --
 .../apache/pulsar/functions/utils/Reflections.java |  34 +++++
 22 files changed, 446 insertions(+), 265 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 0e4d8cc..c1446ed 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.instance.PulsarSource;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
@@ -54,6 +55,7 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
@@ -837,6 +839,10 @@ public class CmdFunctions extends CmdBase {
         if (functionConfig.getInputs() != null) {
             functionDetailsBuilder.setTenant(functionConfig.getTenant());
         }
+        functionDetailsBuilder.setSource(
+                ConnectorDetails.newBuilder()
+                        .setClassName(PulsarSource.class.getName())
+                        .build());
         if (functionConfig.getNamespace() != null) {
             functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
         }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
index 40f1820..2a41336 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
@@ -27,12 +27,12 @@ public interface Source<T> extends AutoCloseable {
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, String> config) throws Exception;
+    void open(final Map<String, Object> config) throws Exception;
 
     /**
-     * Reads the next message from source, if one exists, and returns.  This 
call should be non-blocking.
-     * If source does not have any new messages, return null immediately.
-     * @return next message from source or null, if no new messages are 
available.
+     * Reads the next message from source.
+     * If source does not have any new messages, this call should block.
+     * @return next message from source.  The return result should never be 
null
      * @throws Exception
      */
     Record<T> read() throws Exception;
diff --git a/pulsar-functions/instance/pom.xml 
b/pulsar-functions/instance/pom.xml
index ba5183a..3e4cfa1 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -54,6 +54,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
     </dependency>
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
similarity index 50%
copy from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
copy to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index bd7e788..7233659 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -18,39 +18,29 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.Reflections;
 
-@Getter
-@Setter
-@ToString
-public class InputMessage {
-
-    private Message actualMessage;
-    String topicName;
-    SerDe inputSerDe;
-    Consumer consumer;
-
-    public int getTopicPartition() {
-        MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
-        return msgId.getPartitionIndex();
-    }
-
-    public void ack() {
-        if (null != consumer) {
-            consumer.acknowledgeAsync(actualMessage);
+public class InstanceUtils {
+    public static SerDe initializeSerDe(String serdeClassName, ClassLoader 
clsLoader,
+                                        Class<?> type) {
+        if (null == serdeClassName || serdeClassName.isEmpty()) {
+            return null;
+        } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
+            return initializeDefaultSerDe(type);
+        } else {
+            return Reflections.createInstance(
+                    serdeClassName,
+                    SerDe.class,
+                    clsLoader);
         }
     }
 
-    public void ackCumulative() {
-        if (null != consumer) {
-            consumer.acknowledgeCumulativeAsync(actualMessage);
+    public static SerDe initializeDefaultSerDe(Class<?> type) {
+        if (!DefaultSerDe.IsSupportedType(type)) {
+            throw new RuntimeException("Default Serializer does not support " 
+ type);
         }
+        return new DefaultSerDe(type);
     }
-
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 2133c35..67f69bf 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.functions.instance;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
@@ -48,11 +48,16 @@ public class JavaInstance implements AutoCloseable {
     public JavaInstance(InstanceConfig config, Object userClassObject,
                  ClassLoader clsLoader,
                  PulsarClient pulsarClient,
-                 Consumer inputConsumer) {
+                 Source source) {
         // TODO: cache logger instances by functions?
         Logger instanceLog = LoggerFactory.getLogger("function-" + 
config.getFunctionDetails().getName());
 
-        this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader, inputConsumer);
+        if (source instanceof PulsarSource) {
+            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader,
+                    ((PulsarSource) source).getInputConsumer());
+        } else {
+            this.context = null;
+        }
 
         // create the functions
         if (userClassObject instanceof Function) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4349860..ce5220f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -49,8 +50,10 @@ import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.processors.MessageProcessor;
@@ -88,7 +91,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     @Getter
     private Exception failureException;
     private JavaInstance javaInstance;
-    private volatile boolean running = true;
+    private AtomicBoolean running = new AtomicBoolean(true);
 
     @Getter(AccessLevel.PACKAGE)
     private Map<String, SerDe> inputSerDe;
@@ -101,6 +104,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     // function stats
     private final FunctionStats stats;
 
+    private Record currentRecord;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -159,7 +164,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, 
processor.getInputConsumer());
+        return new JavaInstance(instanceConfig, object, clsLoader, client, 
processor.getSource());
     }
 
     /**
@@ -169,9 +174,11 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     public void run() {
         try {
             javaInstance = setupJavaInstance();
-            while (running) {
+            while (running.get()) {
+
+                currentRecord = processor.recieveMessage();
 
-                InputMessage currentMessage = processor.recieveMessage();
+                processor.postReceiveMessage(currentRecord);
 
                 // state object is per function, because we need to have the 
ability to know what updates
                 // are made in this function and ensure we only acknowledge 
after the state is persisted.
@@ -184,20 +191,20 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 }
 
                 // process the message
-                Object input;
-                try {
-                    input = 
currentMessage.getInputSerDe().deserialize(currentMessage.getActualMessage().getData());
-                } catch (Exception ex) {
-                    
stats.incrementDeserializationExceptions(currentMessage.getTopicName());
-                    throw ex;
-                }
                 long processAt = System.currentTimeMillis();
                 stats.incrementProcessed(processAt);
                 addLogTopicHandler();
-                JavaExecutionResult result = javaInstance.handleMessage(
-                        currentMessage.getActualMessage().getMessageId(),
-                        currentMessage.getTopicName(),
-                        input);
+                JavaExecutionResult result;
+                MessageId messageId = null;
+                String topicName = null;
+
+                if (currentRecord instanceof PulsarRecord) {
+                    PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
+                     messageId = pulsarRecord.getMessageId();
+                     topicName = pulsarRecord.getTopicName();
+                }
+                result = javaInstance.handleMessage(messageId, topicName, 
currentRecord.getValue());
+
                 removeLogTopicHandler();
 
                 long doneProcessing = System.currentTimeMillis();
@@ -209,18 +216,25 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     try {
                         completableFuture.join();
                     } catch (Exception e) {
-                        log.error("Failed to flush the state updates of 
message {}", currentMessage, e);
-                        throw e;
+                        log.error("Failed to flush the state updates of 
message {}", currentRecord, e);
+                        currentRecord.fail();
                     }
                 }
-                processResult(currentMessage, result, processAt, 
doneProcessing);
+                try {
+                    processResult(currentRecord, result, processAt, 
doneProcessing);
+                } catch (Exception e) {
+                    log.warn("Failed to process result of message {}", 
currentRecord, e);
+                    currentRecord.fail();
+                }
             }
-
-            javaInstance.close();
         } catch (Exception ex) {
-            log.info("Uncaught exception in Java Instance", ex);
-            failureException = ex;
-            throw new RuntimeException(ex);
+            log.error("Uncaught exception in Java Instance", ex);
+            if (running.get()) {
+                failureException = ex;
+                throw new RuntimeException(ex);
+            }
+        } finally {
+            close();
         }
     }
 
@@ -286,15 +300,15 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         this.stateTable = result(storageClient.openTable(tableName));
     }
 
-    private void processResult(InputMessage msg,
+    private void processResult(Record srcRecord,
                                JavaExecutionResult result,
                                long startTime, long endTime) throws Exception {
         if (result.getUserException() != null) {
-            log.info("Encountered user exception when processing message {}", 
msg, result.getUserException());
+            log.info("Encountered user exception when processing message {}", 
srcRecord, result.getUserException());
             stats.incrementUserExceptions(result.getUserException());
-            throw result.getUserException();
+            this.currentRecord.fail();
         } else if (result.getSystemException() != null) {
-            log.info("Encountered system exception when processing message 
{}", msg, result.getSystemException());
+            log.info("Encountered system exception when processing message 
{}", srcRecord, result.getSystemException());
             stats.incrementSystemExceptions(result.getSystemException());
             throw result.getSystemException();
         } else {
@@ -308,35 +322,47 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     throw ex;
                 }
                 if (output != null) {
-                    sendOutputMessage(msg, output);
+                    sendOutputMessage(srcRecord, output);
                 } else {
-                    processor.sendOutputMessage(msg, null);
+                    processor.sendOutputMessage(srcRecord, null);
                 }
             } else {
                 // the function doesn't produce any result or the user doesn't 
want the result.
-                processor.sendOutputMessage(msg, null);
+                processor.sendOutputMessage(srcRecord, null);
             }
         }
     }
 
-    private void sendOutputMessage(InputMessage srcMsg,
+    private void sendOutputMessage(Record srcRecord,
                                    byte[] output) throws Exception {
-        MessageBuilder msgBuilder = MessageBuilder.create()
-                .setContent(output)
-                .setProperty("__pfn_input_topic__", srcMsg.getTopicName())
-                .setProperty("__pfn_input_msg_id__", new 
String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray())));
 
-        processor.sendOutputMessage(srcMsg, msgBuilder);
+        MessageBuilder msgBuilder = MessageBuilder.create();
+        if (srcRecord instanceof PulsarRecord) {
+            PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
+            msgBuilder
+                    .setContent(output)
+                    .setProperty("__pfn_input_topic__", 
pulsarMessage.getTopicName())
+                    .setProperty("__pfn_input_msg_id__", new 
String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray())));
+        }
+
+        processor.sendOutputMessage(srcRecord, msgBuilder);
+    }
+
+    /**
+     * Stop java instance runnable
+     */
+    public void stop() {
+        this.running.set(false);
     }
 
     @Override
     public void close() {
-        if (!running) {
+        if (!running.get()) {
             return;
         }
-        running = false;
 
         processor.close();
+        javaInstance.close();
 
         // kill the state table
         if (null != stateTable) {
@@ -401,39 +427,12 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         bldr.putMetrics(metricName, digest);
     }
 
-    private static SerDe initializeSerDe(String serdeClassName, ClassLoader 
clsLoader,
-                                         Class<?>[] typeArgs, boolean 
inputArgs) {
-        if (null == serdeClassName || serdeClassName.isEmpty()) {
-            return null;
-        } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
-            return initializeDefaultSerDe(typeArgs, inputArgs);
-        } else {
-            return Reflections.createInstance(
-                    serdeClassName,
-                    SerDe.class,
-                    clsLoader);
-        }
-    }
-
-    private static SerDe initializeDefaultSerDe(Class<?>[] typeArgs, boolean 
inputArgs) {
-        if (inputArgs) {
-            if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
-                throw new RuntimeException("Default Serializer does not 
support " + typeArgs[0]);
-            }
-            return new DefaultSerDe(typeArgs[0]);
-        } else {
-            if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
-                throw new RuntimeException("Default Serializer does not 
support " + typeArgs[1]);
-            }
-            return new DefaultSerDe(typeArgs[1]);
-        }
-    }
-
     private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
+
         this.inputSerDe = new HashMap<>();
-        
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) -> 
this.inputSerDe.put(k, initializeSerDe(v, clsLoader, typeArgs, true)));
+        
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) -> 
this.inputSerDe.put(k, InstanceUtils.initializeSerDe(v, clsLoader, 
typeArgs[0])));
         for (String topicName : 
instanceConfig.getFunctionDetails().getInputsList()) {
-            this.inputSerDe.put(topicName, initializeDefaultSerDe(typeArgs, 
true));
+            this.inputSerDe.put(topicName, 
InstanceUtils.initializeDefaultSerDe(typeArgs[0]));
         }
 
         if (Void.class.equals(typeArgs[0])) {
@@ -458,9 +457,9 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
             if (instanceConfig.getFunctionDetails().getOutputSerdeClassName() 
== null
                     || 
instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty()
                     || 
instanceConfig.getFunctionDetails().getOutputSerdeClassName().equals(DefaultSerDe.class.getName()))
 {
-                outputSerDe = initializeDefaultSerDe(typeArgs, false);
+                outputSerDe = 
InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
             } else {
-                this.outputSerDe = 
initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(), 
clsLoader, typeArgs, false);
+                this.outputSerDe = 
InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(),
 clsLoader, typeArgs[1]);
             }
             Class<?>[] outputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
             if 
(outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
similarity index 58%
copy from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
copy to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
index bd7e788..c812a77 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
@@ -18,39 +18,25 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import lombok.Builder;
+import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.util.Map;
 
 @Getter
 @Setter
+@Data
+@Builder
 @ToString
-public class InputMessage {
-
-    private Message actualMessage;
-    String topicName;
-    SerDe inputSerDe;
-    Consumer consumer;
-
-    public int getTopicPartition() {
-        MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
-        return msgId.getPartitionIndex();
-    }
-
-    public void ack() {
-        if (null != consumer) {
-            consumer.acknowledgeAsync(actualMessage);
-        }
-    }
-
-    public void ackCumulative() {
-        if (null != consumer) {
-            consumer.acknowledgeCumulativeAsync(actualMessage);
-        }
-    }
-
+public class PulsarConfig {
+    private Function.FunctionDetails.ProcessingGuarantees processingGuarantees;
+    private SubscriptionType subscriptionType;
+    private String subscription;
+    private Map<String, SerDe> topicToSerdeMap;
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
similarity index 56%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
index bd7e788..2bdbdb1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
@@ -18,39 +18,36 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
-import lombok.Setter;
 import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.connect.core.Record;
 
+@Data
+@Builder
 @Getter
-@Setter
 @ToString
-public class InputMessage {
+@EqualsAndHashCode
+public class PulsarRecord<T> implements Record<T> {
 
-    private Message actualMessage;
-    String topicName;
-    SerDe inputSerDe;
-    Consumer consumer;
-
-    public int getTopicPartition() {
-        MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
-        return msgId.getPartitionIndex();
-    }
+    private String partitionId;
+    private Long sequenceId;
+    private T value;
+    private MessageId messageId;
+    private String topicName;
+    private Runnable failFunction;
+    private Runnable ackFunction;
 
+    @Override
     public void ack() {
-        if (null != consumer) {
-            consumer.acknowledgeAsync(actualMessage);
-        }
+        this.ackFunction.run();
     }
 
-    public void ackCumulative() {
-        if (null != consumer) {
-            consumer.acknowledgeCumulativeAsync(actualMessage);
-        }
+    @Override
+    public void fail() {
+        this.failFunction.run();
     }
-
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
new file mode 100644
index 0000000..43d9350
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class PulsarSource<T> implements Source<T> {
+
+    private PulsarClient pulsarClient;
+    private PulsarConfig pulsarConfig;
+
+    @Getter
+    private org.apache.pulsar.client.api.Consumer inputConsumer;
+
+    public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) {
+        this.pulsarClient = pulsarClient;
+        this.pulsarConfig = pulsarConfig;
+    }
+
+    @Override
+    public void open(Map<String, Object> config) throws Exception {
+        this.inputConsumer = this.pulsarClient.newConsumer()
+                .topics(new 
ArrayList<>(this.pulsarConfig.getTopicToSerdeMap().keySet()))
+                .subscriptionName(this.pulsarConfig.getSubscription())
+                .subscriptionType(this.pulsarConfig.getSubscriptionType())
+                .ackTimeout(1, TimeUnit.MINUTES)
+                .subscribe();
+    }
+
+    @Override
+    public Record<T> read() throws Exception {
+        org.apache.pulsar.client.api.Message<T> message = 
this.inputConsumer.receive();
+
+        String topicName;
+        String partitionId;
+
+        // If more than one topics are being read than the Message return by 
the consumer will be TopicMessageImpl
+        // If there is only topic being read then the Message returned by the 
consumer wil be MessageImpl
+        if (message instanceof TopicMessageImpl) {
+            topicName = ((TopicMessageImpl) message).getTopicName();
+            TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) 
message.getMessageId();
+            MessageIdImpl messageId = (MessageIdImpl) 
topicMessageId.getInnerMessageId();
+            partitionId = Long.toString(messageId.getPartitionIndex());
+        } else {
+            topicName = 
this.pulsarConfig.getTopicToSerdeMap().keySet().iterator().next();
+            partitionId = Long.toString(((MessageIdImpl) 
message.getMessageId()).getPartitionIndex());
+        }
+
+        Object object;
+        try {
+            object = 
this.pulsarConfig.getTopicToSerdeMap().get(topicName).deserialize(message.getData());
+        } catch (Exception e) {
+            //TODO Add deserialization exception stats
+            throw new RuntimeException("Error occured when attempting to 
deserialize input:", e);
+        }
+
+        T input;
+        try {
+            input = (T) object;
+        } catch (ClassCastException e) {
+            throw new RuntimeException("Error in casting input to expected 
type:", e);
+        }
+
+        PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) 
PulsarRecord.builder()
+                .value(input)
+                .messageId(message.getMessageId())
+                .partitionId(partitionId)
+                .sequenceId(message.getSequenceId())
+                .topicName(topicName)
+                .ackFunction(() -> {
+                    if (pulsarConfig.getProcessingGuarantees()
+                            == 
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                        inputConsumer.acknowledgeCumulativeAsync(message);
+                    } else {
+                        inputConsumer.acknowledgeAsync(message);
+                    }
+                }).failFunction(() -> {
+                    if (pulsarConfig.getProcessingGuarantees()
+                            == 
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                        throw new RuntimeException("Failed to process message: 
" + message.getMessageId());
+                    }
+                })
+                .build();
+        return pulsarMessage;
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.inputConsumer.close();
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 86aeb83..465d198 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.instance.processors;
 
-import java.util.concurrent.LinkedBlockingDeque;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
@@ -27,7 +26,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
 import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
@@ -52,15 +51,17 @@ public class AtLeastOnceProcessor extends 
MessageProcessorBase {
     }
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg, 
MessageBuilder<byte[]> outputMsgBuilder) {
+    public void sendOutputMessage(Record srcRecord, MessageBuilder 
outputMsgBuilder) {
         if (null == outputMsgBuilder || null == producer) {
-            inputMsg.ack();
+            srcRecord.ack();
             return;
         }
 
         Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg)
-            .thenAccept(msgId -> inputMsg.ack());
+            .thenAccept(msgId -> {
+                srcRecord.ack();
+            });
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index 994be0d..08cf111 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.instance.processors;
 
-import java.util.concurrent.LinkedBlockingDeque;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -26,7 +25,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
 import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
@@ -45,10 +44,10 @@ class AtMostOnceProcessor extends MessageProcessorBase {
     }
 
     @Override
-    protected void postReceiveMessage(InputMessage message) {
-        super.postReceiveMessage(message);
+    public void postReceiveMessage(Record record) {
+        super.postReceiveMessage(record);
         if (functionDetails.getAutoAck()) {
-            message.ack();
+            record.ack();
         }
     }
 
@@ -58,7 +57,7 @@ class AtMostOnceProcessor extends MessageProcessorBase {
     }
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg, 
MessageBuilder<byte[]> outputMsgBuilder) {
+    public void sendOutputMessage(Record srcRecord, MessageBuilder 
outputMsgBuilder) {
         if (null == outputMsgBuilder) {
             return;
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index 59c7bd6..aafca69 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -18,31 +18,22 @@
  */
 package org.apache.pulsar.functions.instance.processors;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.functions.instance.PulsarRecord;
 import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
 
 /**
  * A message processor that process messages effectively-once.
@@ -50,8 +41,6 @@ import org.apache.pulsar.functions.utils.Utils;
 @Slf4j
 class EffectivelyOnceProcessor extends MessageProcessorBase implements 
ConsumerEventListener {
 
-    private LinkedList<String> inputTopicsToResubscribe = null;
-
     @Getter(AccessLevel.PACKAGE)
     protected Producers outputProducer;
 
@@ -103,24 +92,28 @@ class EffectivelyOnceProcessor extends 
MessageProcessorBase implements ConsumerE
     //
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg,
-                                  MessageBuilder<byte[]> outputMsgBuilder) 
throws Exception {
+    public void sendOutputMessage(Record srcRecord,
+                                  MessageBuilder outputMsgBuilder) throws 
Exception {
         if (null == outputMsgBuilder) {
-            inputMsg.ackCumulative();
+            srcRecord.ack();
             return;
         }
 
         // assign sequence id to output message for idempotent producing
         outputMsgBuilder = outputMsgBuilder
-            
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
-
-
-        Producer<byte[]> producer = 
outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
-
-        Message<byte[]> outputMsg = outputMsgBuilder.build();
-        producer.sendAsync(outputMsg)
-                .thenAccept(messageId -> inputMsg.ackCumulative())
-                .join();
+            .setSequenceId(srcRecord.getRecordSequence());
+
+        // currently on PulsarRecord
+        if (srcRecord instanceof PulsarRecord) {
+            PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
+            Producer producer = 
outputProducer.getProducer(pulsarMessage.getTopicName(),
+                    Integer.parseInt(srcRecord.getPartitionId()));
+
+            org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
+            producer.sendAsync(outputMsg)
+                    .thenAccept(messageId -> srcRecord.ack())
+                    .join();
+        }
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 97c971d..fd22adb 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -19,16 +19,15 @@
 package org.apache.pulsar.functions.instance.processors;
 
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
+
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.InputMessage;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
 
@@ -68,6 +67,8 @@ public interface MessageProcessor extends AutoCloseable {
         }
     }
 
+    void postReceiveMessage(Record record);
+
     /**
      * Setup the input with a provided <i>processQueue</i>. The implementation 
of this processor is responsible for
      * setting up the input and passing the received messages from input to 
the provided <i>processQueue</i>.
@@ -78,11 +79,11 @@ public interface MessageProcessor extends AutoCloseable {
         throws Exception;
 
     /**
-     * Return the input.
+     * Return the source.
      *
-     * @return the input consumer.
+     * @return the source.
      */
-    Consumer<byte[]> getInputConsumer();
+    Source getSource();
 
     /**
      * Setup the output with a provided <i>outputSerDe</i>. The implementation 
of this processor is responsible for
@@ -99,18 +100,18 @@ public interface MessageProcessor extends AutoCloseable {
      * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't 
have to send any messages to the output.
      * The implementation can decide to acknowledge the input message based on 
its process guarantees.
      *
-     * @param inputMsg input message
+     * @param srcRecord record from source
      * @param outputMsgBuilder output message builder. it can be null.
      */
-    void sendOutputMessage(InputMessage inputMsg,
-                           MessageBuilder<byte[]> outputMsgBuilder) throws 
PulsarClientException, Exception;
+    void sendOutputMessage(Record srcRecord,
+                           MessageBuilder outputMsgBuilder) throws 
PulsarClientException, Exception;
 
     /**
      * Get the next message to process
      * @return the next input message
      * @throws Exception
      */
-    InputMessage recieveMessage() throws Exception;
+    Record recieveMessage() throws Exception;
 
     @Override
     void close();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index fc37b13..06bc175 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -25,18 +25,17 @@ import java.util.Map;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.instance.PulsarConfig;
+import org.apache.pulsar.functions.instance.PulsarSource;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.Reflections;
 
 /**
  * The base implementation of {@link MessageProcessor}.
@@ -48,12 +47,8 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
     protected final FunctionDetails functionDetails;
     protected final SubscriptionType subType;
 
-    protected Map<String, SerDe> inputSerDe;
-
-    protected SerDe outputSerDe;
-
     @Getter
-    protected Consumer<byte[]> inputConsumer;
+    protected Source source;
 
     protected List<String> topics;
 
@@ -64,6 +59,8 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
         this.functionDetails = functionDetails;
         this.subType = subType;
         this.topics = new LinkedList<>();
+        this.topics.addAll(this.functionDetails.getInputsList());
+        
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
     }
 
     //
@@ -72,36 +69,52 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
 
     @Override
     public void setupInput(Map<String, SerDe> inputSerDe) throws Exception {
-        log.info("Setting up input with input serdes: {}", inputSerDe);
-        this.inputSerDe = inputSerDe;
-        
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
-        this.topics.addAll(this.functionDetails.getInputsList());
 
-        this.inputConsumer = this.client.newConsumer()
-                .topics(this.topics)
-                
.subscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
-                .subscriptionType(getSubscriptionType())
-                .subscribe();
+        org.apache.pulsar.functions.proto.Function.ConnectorDetails 
connectorDetails = this.functionDetails.getSource();
+        Object object;
+        if 
(connectorDetails.getClassName().equals(PulsarSource.class.getName())) {
+            PulsarConfig pulsarConfig = PulsarConfig.builder()
+                    .topicToSerdeMap(inputSerDe)
+                    
.subscription(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
+                    
.processingGuarantees(this.functionDetails.getProcessingGuarantees())
+                    .subscriptionType(this.subType)
+                    .build();
+            Object[] params = {this.client, pulsarConfig};
+            Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
+            object = Reflections.createInstance(
+                    connectorDetails.getClassName(),
+                    PulsarSource.class.getClassLoader(), params, paramTypes);
+
+        } else {
+            object = Reflections.createInstance(
+                    connectorDetails.getClassName(),
+                    Thread.currentThread().getContextClassLoader());
+        }
+
+        Class<?>[] typeArgs;
+        if (object instanceof Source) {
+            typeArgs = TypeResolver.resolveRawArguments(Source.class, 
object.getClass());
+            assert typeArgs.length > 0;
+        } else {
+            throw new RuntimeException("Source does not implement correct 
interface");
+        }
+        this.source = (Source) object;
+
+        try {
+            this.source.open(connectorDetails.getConfigsMap());
+        } catch (Exception e) {
+            log.info("Error occurred executing open for source: {}",
+                    this.functionDetails.getSource().getClassName(), e);
+        }
+
     }
 
     protected SubscriptionType getSubscriptionType() {
         return subType;
     }
 
-    public InputMessage recieveMessage() throws PulsarClientException {
-        Message message = this.inputConsumer.receive();
-        String topicName;
-        if (message instanceof TopicMessageImpl) {
-            topicName = ((TopicMessageImpl)message).getTopicName();
-        } else {
-            topicName = this.topics.get(0);
-        }
-        InputMessage inputMessage = new InputMessage();
-                inputMessage.setConsumer(inputConsumer);
-                inputMessage.setInputSerDe(inputSerDe.get(topicName));
-                inputMessage.setActualMessage(message);
-                inputMessage.setTopicName(topicName);
-        return inputMessage;
+    public Record recieveMessage() throws Exception {
+        return this.source.read();
     }
 
     /**
@@ -110,9 +123,10 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
      * <p>The processor implementation can make a decision to process the 
message based on its processing guarantees.
      * for example, an at-most-once processor can ack the message immediately.
      *
-     * @param message input message.
+     * @param record input message.
      */
-    protected void postReceiveMessage(InputMessage message) {}
+    @Override
+    public void postReceiveMessage(Record record) {}
 
     //
     // Output
@@ -120,8 +134,6 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
 
     @Override
     public void setupOutput(SerDe outputSerDe) throws Exception {
-        this.outputSerDe = outputSerDe;
-
         String outputTopic = functionDetails.getOutput();
         if (outputTopic != null
                 && !functionDetails.getOutput().isEmpty()
@@ -141,10 +153,9 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
     public void close() {
 
         try {
-            this.inputConsumer.close();
-        } catch (PulsarClientException e) {
-            log.warn("Failed to close consumer to input topics {}",
-                    ((MultiTopicsConsumerImpl) 
this.inputConsumer).getTopics(), e);
+            this.source.close();
+        } catch (Exception e) {
+            log.warn("Failed to close source {}", this.source, e);
         }
     }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index b9e762a..c0aae7c 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
@@ -50,7 +51,7 @@ public class JavaInstanceTest {
         JavaInstance instance = new JavaInstance(
             config,
             (Function<String, String>) (input, context) -> input + "-lambda",
-            null, null, null);
+            null, null, mock(PulsarSource.class));
         String testString = "ABC123";
         JavaExecutionResult result = 
instance.handleMessage(MessageId.earliest, "random", testString);
         assertNotNull(result.getResult());
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index c9b3da2..1aeb21a 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -53,6 +53,12 @@ message FunctionDetails {
     bool autoAck = 13;
     repeated string inputs = 14;
     int32 parallelism = 15;
+    ConnectorDetails source = 16;
+}
+
+message ConnectorDetails {
+    string className = 1;
+    map<string, string> configs = 4;
 }
 
 message PackageLocationMetaData {
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 88f1ac7..4b14dad 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -30,6 +30,7 @@ import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function.ConnectorDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
@@ -106,6 +107,13 @@ public class JavaInstanceMain {
     @Parameter(names = "--subscription_type", description = "What subscription 
type to use")
     protected FunctionDetails.SubscriptionType subscriptionType;
 
+    @Parameter(names = "--source_classname", description = "The source 
classname")
+    protected String sourceClassname;
+
+    @Parameter(names = "--source_configs", description = "The source 
classname")
+    protected String sourceConfigs;
+
+
     private Server server;
 
     public JavaInstanceMain() { }
@@ -159,6 +167,16 @@ public class JavaInstanceMain {
             Map<String, String> userConfigMap = new 
Gson().fromJson(userConfig, type);
             functionDetailsBuilder.putAllUserConfig(userConfigMap);
         }
+
+        ConnectorDetails.Builder sourceDetailsBuilder = 
ConnectorDetails.newBuilder();
+        sourceDetailsBuilder.setClassName(sourceClassname);
+        if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+            Type type = new TypeToken<Map<String, String>>(){}.getType();
+            Map<String, String> sourceConfigMap = new 
Gson().fromJson(sourceConfigs, type);
+            sourceDetailsBuilder.putAllConfigs(sourceConfigMap);
+        }
+        functionDetailsBuilder.setSource(sourceDetailsBuilder);
+
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index def06cf..8da2b9d 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -175,6 +175,13 @@ class ProcessRuntime implements Runtime {
         instancePort = findAvailablePort();
         args.add("--port");
         args.add(String.valueOf(instancePort));
+        args.add("--source_classname");
+        
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
+        Map<String, String> sourceConfigs = 
instanceConfig.getFunctionDetails().getSource().getConfigsMap();
+        if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+            args.add("--source_config");
+            args.add(new Gson().toJson(sourceConfigs));
+        }
 
         return args;
     }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 830e18a..136951e 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -79,7 +79,15 @@ class ThreadRuntime implements Runtime {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
                 startupException = new Exception(e);
+                log.error("Error occured in java instance:", e);
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e1) {
+                    //ignore
+                }
+                // restart
                 start();
+
             }
         });
         this.fnThread.start();
@@ -95,9 +103,10 @@ class ThreadRuntime implements Runtime {
     @Override
     public void stop() {
         if (fnThread != null) {
+            // Stop instance thread
+            javaInstanceRunnable.stop();
             // interrupt the instance thread
             fnThread.interrupt();
-            javaInstanceRunnable.close();
             try {
                 fnThread.join();
             } catch (InterruptedException e) {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 7042c26..67ac82f 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.runtime.ProcessRuntime;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
@@ -80,6 +81,8 @@ public class ProcessRuntimeTest {
         functionDetailsBuilder.setOutput(TEST_NAME + "-output");
         
functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
         functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+        functionDetailsBuilder.setSource(Function.ConnectorDetails.newBuilder()
+                .setClassName("org.pulsar.pulsar.TestSource"));
         return functionDetailsBuilder.build();
     }
 
@@ -101,8 +104,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 43);
-        args.remove(args.size() - 1);
+        assertEquals(args.size(), 45);
         String expectedArgs = "java -cp " + javaInstanceJarFile + " 
-Dlog4j.configurationFile=java_instance_log4j2.yml "
                 + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " 
-Dpulsar.log.file=" + config.getFunctionDetails().getName()
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
@@ -120,7 +122,8 @@ public class ProcessRuntimeTest {
                 + " --output_serde_classname " + 
config.getFunctionDetails().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port";
+                + " --max_buffered_tuples 1024 --port " + args.get(42)
+                + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
@@ -130,8 +133,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, 
userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 42);
-        args.remove(args.size() - 1);
+        assertEquals(args.size(), 44);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + 
config.getFunctionDetails().getName() + " --instance_id "
@@ -148,7 +150,8 @@ public class ProcessRuntimeTest {
                 + " --output_serde_classname " + 
config.getFunctionDetails().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port";
+                + " --max_buffered_tuples 1024 --port " + args.get(41)
+                + " --source_classname " + 
config.getFunctionDetails().getSource().getClassName();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
index 7706030..5ecdd4f 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
@@ -43,17 +43,6 @@ public class FunctionDetailsUtils {
         return fullyQualifiedName.split("/")[2];
     }
 
-    public static boolean areAllRequiredFieldsPresent(FunctionDetails 
FunctionDetails) {
-        if (FunctionDetails.getTenant() == null || 
FunctionDetails.getNamespace() == null
-                || FunctionDetails.getName() == null || 
FunctionDetails.getClassName() == null
-                || (FunctionDetails.getInputsCount() <= 0 && 
FunctionDetails.getCustomSerdeInputsCount() <= 0)
-                || FunctionDetails.getParallelism() <= 0) {
-            return false;
-        } else {
-            return true;
-        }
-    }
-
     public static String getDownloadFileName(FunctionDetails FunctionDetails) {
         String[] hierarchy = FunctionDetails.getClassName().split("\\.");
         String fileName;
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 890195c..64418bb 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -116,6 +116,40 @@ public class Reflections {
 
     }
 
+    public static Object createInstance(String userClassName,
+                                        ClassLoader classLoader, Object[] 
params, Class[] paramTypes) {
+        if (params.length != paramTypes.length) {
+            throw new RuntimeException(
+                    "Unequal number of params and paramTypes. Each param must 
have a correspoinding param type");
+        }
+        Class<?> theCls;
+        try {
+            theCls = Class.forName(userClassName, true, classLoader);
+        } catch (ClassNotFoundException cnfe) {
+            throw new RuntimeException("User class must be in class path", 
cnfe);
+        }
+        Object result;
+        try {
+            Constructor<?> meth = constructorCache.get(theCls);
+            if (null == meth) {
+                meth = theCls.getDeclaredConstructor(paramTypes);
+                meth.setAccessible(true);
+                constructorCache.put(theCls, meth);
+            }
+            result = meth.newInstance(params);
+        } catch (InstantiationException ie) {
+            throw new RuntimeException("User class must be concrete", ie);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException("User class doesn't have such method", 
e);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("User class must have a no-arg 
constructor", e);
+        } catch (InvocationTargetException e) {
+            throw new RuntimeException("User class constructor throws 
exception", e);
+        }
+        return result;
+
+    }
+
     public static Object createInstance(String userClassName, java.io.File 
jar) {
         try {
             return createInstance(userClassName, loadJar(jar));

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to