This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fad36b   Rename Connect `Message` interface to `Record` (#1636)
1fad36b is described below

commit 1fad36bb81f772d7851167b4d2170d388547a620
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Apr 24 17:17:44 2018 -0700

     Rename Connect `Message` interface to `Record` (#1636)
    
    * Rename Connect `Message` interface to `Record`
    
    *Motivation*
    
    Having two different `Message` interfaces is a bit confusing and also 
introduced unnecessary object allocation.
    
    *Solution*
    
    - rename connect `Message` interface to `Record` interface
    - introduce an abstract base implementation for both `api.Message` and 
`connect.Record`
    - change the places to the new class
    
    * Add the missing class file
    
    * - change Map<String, String> to Map<String, Object> for all open calls
    - add a `RuntimeSink` in functions which extends Sink and provides the 
flexibility to control acking behavior for pulsar sink
    
    * record sequence should be part of RecordContext
---
 pulsar-client-admin-shaded-for-functions/pom.xml   |  7 ++
 pulsar-client-admin-shaded/pom.xml                 |  7 ++
 pulsar-client-shaded/pom.xml                       |  7 ++
 pulsar-client/pom.xml                              |  6 ++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  3 +-
 .../pulsar/client/impl/MessageRecordImpl.java      | 65 +++++++++++++++
 .../pulsar/client/impl/TopicMessageImpl.java       |  9 +-
 .../pulsar/client/impl/MessageRecordImplTest.java  | 60 ++++++++++++++
 .../pulsar/connect/aerospike/AerospikeSink.java    |  9 +-
 .../connect/aerospike/AerospikeSinkConfig.java     |  2 +-
 .../pulsar/connect/cassandra/CassandraSink.java    |  7 +-
 .../connect/cassandra/CassandraSinkConfig.java     |  2 +-
 .../org/apache/pulsar/connect/core/PushSource.java |  4 +-
 .../connect/core/{Source.java => Record.java}      | 24 ++----
 .../core/{Message.java => RecordContext.java}      | 27 +++---
 .../java/org/apache/pulsar/connect/core/Sink.java  |  6 +-
 .../org/apache/pulsar/connect/core/Source.java     |  2 +-
 .../org/apache/pulsar/connect/kafka/KafkaSink.java |  9 +-
 .../pulsar/connect/kafka/KafkaSinkConfig.java      |  2 +-
 .../apache/pulsar/connect/kafka/KafkaSource.java   | 22 ++---
 .../pulsar/connect/kafka/KafkaSourceConfig.java    |  2 +-
 .../pulsar/connect/rabbitmq/RabbitMQConfig.java    |  2 +-
 .../pulsar/connect/rabbitmq/RabbitMQSource.java    | 22 ++---
 .../pulsar/connect/twitter/TwitterFireHose.java    | 16 ++--
 .../connect/twitter/TwitterFireHoseConfig.java     |  2 +-
 .../pulsar/functions/sink/DefaultRuntimeSink.java  | 46 ++++++++---
 .../apache/pulsar/functions/sink/RuntimeSink.java  | 52 ++++++++++++
 .../functions/sink/DefaultRuntimeSinkTest.java     | 96 ++++++++++++++++++++++
 pulsar-functions/utils/pom.xml                     |  4 +
 29 files changed, 414 insertions(+), 108 deletions(-)

diff --git a/pulsar-client-admin-shaded-for-functions/pom.xml 
b/pulsar-client-admin-shaded-for-functions/pom.xml
index 5953b75..1e38333 100644
--- a/pulsar-client-admin-shaded-for-functions/pom.xml
+++ b/pulsar-client-admin-shaded-for-functions/pom.xml
@@ -61,6 +61,7 @@
                 <includes>
                   <include>org.apache.pulsar:pulsar-common</include>
                   <include>org.apache.bookkeeper:circe-checksum</include>
+                  <include>org.apache.pulsar:pulsar-connect-core</include>
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   
<include>org.apache.pulsar:pulsar-client-admin-original</include>
                   <!-- client dependencies as below -->
@@ -107,6 +108,12 @@
                   </includes>
                 </filter>
                 <filter>
+                  <artifact>org.apache.pulsar:pulsar-connect-core</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+                <filter>
                   <artifact>org.apache.pulsar:pulsar-client-original</artifact>
                   <includes>
                     <include>**</include>
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index fcd9349..df5f867 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -57,6 +57,7 @@
 
               <artifactSet>
                 <includes>
+                  <include>org.apache.pulsar:pulsar-connect-core</include>
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   
<include>org.apache.pulsar:pulsar-client-admin-original</include>
                   <include>org.apache.commons:commons-lang3</include>
@@ -93,6 +94,12 @@
                   </includes>
                 </filter>
                 <filter>
+                  <artifact>org.apache.pulsar:pulsar-connect-core</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+                <filter>
                   <artifact>org.apache.pulsar:pulsar-client-original</artifact>
                   <includes>
                     <include>**</include>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index c67c546..14ace60 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -59,6 +59,7 @@
 
               <artifactSet>
                 <includes>
+                  <include>org.apache.pulsar:pulsar-connect-core</include>
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   <include>org.apache.commons:commons-lang3</include>
                   <include>commons-codec:commons-codec</include>
@@ -94,6 +95,12 @@
                   </includes>
                 </filter>
                 <filter>
+                  <artifact>org.apache.pulsar:pulsar-connect-core</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+                <filter>
                   <artifact>org.apache.pulsar:pulsar-client-original</artifact>
                   <includes>
                     <include>**</include>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 267c5e7..f6b5b40 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -40,6 +40,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f9ba65c..2989d30 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -43,10 +43,9 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
-public class MessageImpl<T> implements Message<T> {
+public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> {
 
     private MessageMetadata.Builder msgMetadataBuilder;
-    private MessageId messageId;
     private ClientCnx cnx;
     private ByteBuf payload;
     private Schema<T> schema;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
new file mode 100644
index 0000000..5e8128a
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.connect.core.Record;
+
+/**
+ * Abstract class that implements message api and connect record api.
+ */
+public abstract class MessageRecordImpl<T, M extends MessageId> implements 
Message<T>, Record<T> {
+
+    protected M messageId;
+    protected java.util.function.Consumer<M> ackFunction;
+
+    public void setAckFunction(java.util.function.Consumer<M> ackFunction) {
+        this.ackFunction = ackFunction;
+    }
+
+    @Override
+    public String getPartitionId() {
+        if (null != messageId) {
+            if (messageId instanceof MessageIdImpl) {
+                return String.valueOf(((MessageIdImpl) 
messageId).getPartitionIndex());
+            } else {
+                return "";
+            }
+        }
+        return "";
+    }
+
+    @Override
+    public long getRecordSequence() {
+        return getSequenceId();
+    }
+
+    @Override
+    public void ack() {
+        if (null != ackFunction) {
+            ackFunction.accept((M) getMessageId());
+        }
+    }
+
+    @Override
+    public void fail() {
+        // no-op
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 5ae452e..18e2088 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -23,17 +23,16 @@ import java.util.Map;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 
-public class TopicMessageImpl<T> implements Message<T> {
+public class TopicMessageImpl<T> extends MessageRecordImpl<T, 
TopicMessageIdImpl> {
 
     private final String topicName;
     private final Message<T> msg;
-    private final TopicMessageIdImpl msgId;
 
     TopicMessageImpl(String topicName,
                      Message<T> msg) {
         this.topicName = topicName;
         this.msg = msg;
-        this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+        this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
     }
 
     /**
@@ -46,11 +45,11 @@ public class TopicMessageImpl<T> implements Message<T> {
 
     @Override
     public MessageId getMessageId() {
-        return msgId;
+        return messageId;
     }
 
     public MessageId getInnerMessageId() {
-        return msgId.getInnerMessageId();
+        return messageId.getInnerMessageId();
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
new file mode 100644
index 0000000..7df373a
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.client.api.MessageId;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MessageRecordImplTest {
+
+    private MessageRecordImpl<byte[], MessageId> message;
+    private MessageId messageId;
+
+    @BeforeMethod
+    public void setup() {
+        this.messageId = mock(MessageId.class);
+        this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS);
+        when(message.getMessageId()).thenReturn(messageId);
+    }
+
+    @Test
+    public void testAck() throws Exception {
+        final AtomicReference<MessageId> msgIdRef = new AtomicReference<>();
+        final CountDownLatch ackLatch = new CountDownLatch(1);
+
+        this.message.setAckFunction(msgId -> {
+            msgIdRef.set(msgId);
+            ackLatch.countDown();
+        });
+
+        this.message.ack();
+        ackLatch.await();
+
+        assertSame(messageId, msgIdRef.get());
+    }
+
+}
diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
index 73faf05..daab766 100644
--- 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
@@ -32,7 +32,6 @@ import com.aerospike.client.listener.WriteListener;
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.common.util.KeyValue;
-import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +51,7 @@ public class AerospikeSink<K, V> implements Sink<KeyValue<K, 
V>> {
     private EventLoop eventLoop;
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         aerospikeSinkConfig = AerospikeSinkConfig.load(config);
         if (aerospikeSinkConfig.getSeedHosts() == null
                 || aerospikeSinkConfig.getKeyspace() == null
@@ -78,10 +77,10 @@ public class AerospikeSink<K, V> implements 
Sink<KeyValue<K, V>> {
     }
 
     @Override
-    public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) {
+    public CompletableFuture<Void> write(KeyValue<K, V> record) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), tuple.getData().getKey().toString());
-        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(tuple.getData().getValue()));
+        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), record.getKey().toString());
+        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(record.getValue()));
         AWriteListener listener = null;
         try {
             listener = queue.take();
diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
index 72f530b..ef02c80 100644
--- 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
@@ -57,7 +57,7 @@ public class AerospikeSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
     }
 
-    public static AerospikeSinkConfig load(Map<String, String> map) throws 
IOException {
+    public static AerospikeSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
AerospikeSinkConfig.class);
     }
diff --git 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
index a003d8f..bc87ec6 100644
--- 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
+++ 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
@@ -23,7 +23,6 @@ import com.datastax.driver.core.*;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.pulsar.common.util.KeyValue;
-import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +45,7 @@ public class CassandraSink<K, V> implements Sink<KeyValue<K, 
V>> {
     private PreparedStatement statement;
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         cassandraSinkConfig = CassandraSinkConfig.load(config);
         if (cassandraSinkConfig.getRoots() == null
                 || cassandraSinkConfig.getKeyspace() == null
@@ -67,8 +66,8 @@ public class CassandraSink<K, V> implements Sink<KeyValue<K, 
V>> {
     }
 
     @Override
-    public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) {
-        BoundStatement bound = statement.bind(tuple.getData().getKey(), 
tuple.getData().getValue());
+    public CompletableFuture<Void> write(KeyValue<K, V> record) {
+        BoundStatement bound = statement.bind(record.getKey(), 
record.getValue());
         ResultSetFuture future = session.executeAsync(bound);
         CompletableFuture<Void> completable = new CompletableFuture<Void>();
         Futures.addCallback(future,
diff --git 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
index b21c95e..5bcfb52 100644
--- 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
+++ 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
@@ -50,7 +50,7 @@ public class CassandraSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), CassandraSinkConfig.class);
     }
 
-    public static CassandraSinkConfig load(Map<String, String> map) throws 
IOException {
+    public static CassandraSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
CassandraSinkConfig.class);
     }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
index 1da8f78..4e6f64b 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -41,12 +41,12 @@ public interface PushSource<T> extends AutoCloseable {
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, String> config) throws Exception;
+    void open(final Map<String, Object> config) throws Exception;
 
     /**
      * Attach a consumer function to this Source. This is invoked by the 
implementation
      * to pass messages whenever there is data to be pushed to Pulsar.
      * @param consumer
      */
-    void setConsumer(Function<Message<T>, CompletableFuture<Void>> consumer);
+    void setConsumer(Function<Record<T>, CompletableFuture<Void>> consumer);
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java
similarity index 57%
copy from 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
copy to 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java
index b63a4b7..c5137ea 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java
@@ -18,22 +18,14 @@
  */
 package org.apache.pulsar.connect.core;
 
-import java.util.Map;
-
-public interface Source<T> extends AutoCloseable {
-    /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @throws Exception IO type exceptions when opening a connector
-     */
-    void open(final Map<String, String> config) throws Exception;
-
+/**
+ * Pulsar Connect's Record interface. Record encapsulates the
+ * information about a record being read from a Source.
+ */
+public interface Record<T> extends RecordContext {
     /**
-     * Reads the next message from source, if one exists, and returns.  This 
call should be non-blocking.
-     * If source does not have any new messages, return null immediately.
-     * @return next message from source or null, if no new messages are 
available.
-     * @throws Exception
+     * Retrieves the actual data of the record
+     * @return The record data
      */
-    Message<T> read() throws Exception;
+    T getValue();
 }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java
similarity index 60%
rename from 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java
rename to 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java
index 1fcdb25..094ca8c 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java
@@ -19,35 +19,30 @@
 package org.apache.pulsar.connect.core;
 
 /**
- * Pulsar Connect's Message interface. Message encapsulates the
- * information about a message being read/written from/to a Source/Sink.
+ * A source context that can be used by the runtime to interact with source.
  */
-public interface Message<T> {
+public interface RecordContext {
+
     /**
-     * Retrieves the partition information if any of the message
+     * Retrieves the partition information if any of the record.
      * @return The partition id where the
      */
     default String getPartitionId() { return null; }
 
     /**
-     * Retrieves the sequence id of the message
-     * @return Sequence Id associated with the message
+     * Retrieves the sequence of the record from a source partition.
+     * @return Sequence Id associated with the record
      */
-    default Long getSequenceId() { return -1L; }
+    default long getRecordSequence() { return -1L; }
 
     /**
-     * Retrieves the actual data of the message
-     * @return The message data
+     * Acknowledge that this record is fully processed
      */
-    T getData();
+    default void ack() {}
 
     /**
-     * Acknowledge that this message is fully processed
+     * To indicate that this record has failed to be processed
      */
-    default void ack() {};
+    default void fail() {}
 
-    /**
-     * To indicate that this message has failed to be processed
-     */
-    default void fail() {};
 }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index f7d1b7b..ca569e7 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -37,13 +37,13 @@ public interface Sink<T> extends AutoCloseable {
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, String> config) throws Exception;
+    void open(final Map<String, Object> config) throws Exception;
 
     /**
      * Attempt to publish a type safe collection of messages
      *
-     * @param message Object to publish to the sink
+     * @param value output value
      * @return Completable future fo async publish request
      */
-    CompletableFuture<Void> write(final Message<T> message);
+    CompletableFuture<Void> write(T value);
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
index b63a4b7..40f1820 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
@@ -35,5 +35,5 @@ public interface Source<T> extends AutoCloseable {
      * @return next message from source or null, if no new messages are 
available.
      * @throws Exception
      */
-    Message<T> read() throws Exception;
+    Record<T> read() throws Exception;
 }
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
index 18919f4..fc8e2af 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.common.util.KeyValue;
-import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,9 +47,9 @@ public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
-    public CompletableFuture<Void> write(Message<KeyValue<K, V>> message) {
-        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getData().getKey(), 
message.getData().getValue());
-        LOG.debug("Message sending to kafka, record={}.", record);
+    public CompletableFuture<Void> write(KeyValue<K, V> message) {
+        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), 
message.getValue());
+        LOG.debug("Record sending to kafka, record={}.", record);
         Future f = producer.send(record);
         return CompletableFuture.supplyAsync(() -> {
             try {
@@ -69,7 +68,7 @@ public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
     }
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         kafkaSinkConfig = KafkaSinkConfig.load(config);
         if (kafkaSinkConfig.getTopic() == null
                 || kafkaSinkConfig.getBootstrapServers() == null
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
index 45aea78..6da494e 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
@@ -52,7 +52,7 @@ public class KafkaSinkConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class);
     }
 
-    public static KafkaSinkConfig load(Map<String, String> map) throws 
IOException {
+    public static KafkaSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KafkaSinkConfig.class);
     }
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
index f9bb8c1..7520ee6 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.pulsar.connect.core.Message;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +45,10 @@ public class KafkaSource<V> implements PushSource<V> {
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
 
-    private java.util.function.Function<Message<V>, CompletableFuture<Void>> 
consumeFunction;
+    private java.util.function.Function<Record<V>, CompletableFuture<Void>> 
consumeFunction;
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         kafkaSourceConfig = KafkaSourceConfig.load(config);
         if (kafkaSourceConfig.getTopic() == null
                 || kafkaSourceConfig.getBootstrapServers() == null
@@ -101,8 +101,8 @@ public class KafkaSource<V> implements PushSource<V> {
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[records.count()];
                 int index = 0;
                 for (ConsumerRecord<String, V> record : records) {
-                    LOG.debug("Message received from kafka, key: {}. value: 
{}", record.key(), record.value());
-                    futures[index] = consumeFunction.apply(new 
KafkaMesssage<>(record));
+                    LOG.debug("Record received from kafka, key: {}. value: 
{}", record.key(), record.value());
+                    futures[index] = consumeFunction.apply(new 
KafkaRecord<>(record));
                     index++;
                 }
                 if (!kafkaSourceConfig.isAutoCommitEnabled()) {
@@ -121,14 +121,14 @@ public class KafkaSource<V> implements PushSource<V> {
     }
 
     @Override
-    public void setConsumer(java.util.function.Function<Message<V>, 
CompletableFuture<Void>> consumeFunction) {
+    public void setConsumer(java.util.function.Function<Record<V>, 
CompletableFuture<Void>> consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
 
-    static private class KafkaMesssage<V> implements Message<V>  {
-        ConsumerRecord<String, V> record;
+    static private class KafkaRecord<V> implements Record<V> {
+        private final ConsumerRecord<String, V> record;
 
-        public KafkaMesssage(ConsumerRecord<String, V> record) {
+        public KafkaRecord(ConsumerRecord<String, V> record) {
             this.record = record;
 
         }
@@ -138,12 +138,12 @@ public class KafkaSource<V> implements PushSource<V> {
         }
 
         @Override
-        public Long getSequenceId() {
+        public long getRecordSequence() {
             return record.offset();
         }
 
         @Override
-        public V getData() {
+        public V getValue() {
             return record.value();
         }
     }
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
index 77fd77b..0d41b1b 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
@@ -54,7 +54,7 @@ public class KafkaSourceConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class);
     }
 
-    public static KafkaSourceConfig load(Map<String, String> map) throws 
IOException {
+    public static KafkaSourceConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KafkaSourceConfig.class);
     }
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
index 1d7268e..e76b03f 100644
--- 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
@@ -48,7 +48,7 @@ public class RabbitMQConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
     }
 
-    public static RabbitMQConfig load(Map<String, String> map) throws 
IOException {
+    public static RabbitMQConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
RabbitMQConfig.class);
     }
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
index 847791a..c548f99 100644
--- 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
@@ -20,7 +20,7 @@
 package org.apache.pulsar.connect.rabbitmq;
 
 import com.rabbitmq.client.*;
-import org.apache.pulsar.connect.core.Message;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,18 +37,18 @@ public class RabbitMQSource implements PushSource<byte[]> {
 
     private static Logger logger = 
LoggerFactory.getLogger(RabbitMQSource.class);
 
-    private Function<Message<byte[]>, CompletableFuture<Void>> consumer;
+    private Function<Record<byte[]>, CompletableFuture<Void>> consumer;
     private Connection rabbitMQConnection;
     private Channel rabbitMQChannel;
     private RabbitMQConfig rabbitMQConfig;
 
     @Override
-    public void setConsumer(Function<Message<byte[]>, CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Function<Record<byte[]>, CompletableFuture<Void>> 
consumeFunction) {
         this.consumer = consumeFunction;
     }
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         rabbitMQConfig = RabbitMQConfig.load(config);
         if (rabbitMQConfig.getAmqUri() == null
                 || rabbitMQConfig.getQueueName() == null) {
@@ -75,28 +75,28 @@ public class RabbitMQSource implements PushSource<byte[]> {
     }
 
     private class RabbitMQConsumer extends DefaultConsumer {
-        private Function<Message<byte[]>, CompletableFuture<Void>> 
consumeFunction;
+        private Function<Record<byte[]>, CompletableFuture<Void>> 
consumeFunction;
 
-        public RabbitMQConsumer(Function<Message<byte[]>, 
CompletableFuture<Void>> consumeFunction, Channel channel) {
+        public RabbitMQConsumer(Function<Record<byte[]>, 
CompletableFuture<Void>> consumeFunction, Channel channel) {
             super(channel);
             this.consumeFunction = consumeFunction;
         }
 
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-            consumeFunction.apply(new RabbitMQMessage(body));
+            consumeFunction.apply(new RabbitMQRecord(body));
         }
     }
 
-    static private class RabbitMQMessage implements Message<byte[]> {
-        private byte[] data;
+    static private class RabbitMQRecord implements Record<byte[]> {
+        private final byte[] data;
 
-        public RabbitMQMessage(byte[] data) {
+        public RabbitMQRecord(byte[] data) {
             this.data = data;
         }
 
         @Override
-        public byte[] getData() {
+        public byte[] getValue() {
             return data;
         }
     }
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
index 3f2316e..1dcbb17 100644
--- 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
-import org.apache.pulsar.connect.core.Message;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,10 +52,10 @@ public class TwitterFireHose implements PushSource<String> {
 
     // ----- Runtime fields
     private Object waitObject;
-    private Function<Message<String>, CompletableFuture<Void>> consumeFunction;
+    private Function<Record<String>, CompletableFuture<Void>> consumeFunction;
 
     @Override
-    public void open(Map<String, String> config) throws IOException {
+    public void open(Map<String, Object> config) throws IOException {
         TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
         if (hoseConfig.getConsumerKey() == null
                 || hoseConfig.getConsumerSecret() == null
@@ -68,7 +68,7 @@ public class TwitterFireHose implements PushSource<String> {
     }
 
     @Override
-    public void setConsumer(Function<Message<String>, CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Function<Record<String>, CompletableFuture<Void>> 
consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
 
@@ -127,7 +127,7 @@ public class TwitterFireHose implements PushSource<String> {
                             // We don't really care if the future succeeds or 
not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for 
connectors
-                            consumeFunction.apply(new TwitterMessage(line));
+                            consumeFunction.apply(new TwitterRecord(line));
                         } catch (Exception e) {
                             LOG.error("Exception thrown");
                         }
@@ -165,15 +165,15 @@ public class TwitterFireHose implements 
PushSource<String> {
         }
     }
 
-    static private class TwitterMessage implements Message<String> {
+    static private class TwitterRecord implements Record<String> {
         private String tweet;
 
-        public TwitterMessage(String tweet) {
+        public TwitterRecord(String tweet) {
             this.tweet = tweet;
         }
 
         @Override
-        public String getData() {
+        public String getValue() {
             return tweet;
         }
     }
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
index f0614bd..57782e2 100644
--- 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
@@ -56,7 +56,7 @@ public class TwitterFireHoseConfig implements Serializable {
         return mapper.readValue(new File(yamlFile), 
TwitterFireHoseConfig.class);
     }
 
-    public static TwitterFireHoseConfig load(Map<String, String> map) throws 
IOException {
+    public static TwitterFireHoseConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
TwitterFireHoseConfig.class);
     }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
similarity index 58%
copy from 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
copy to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
index f7d1b7b..8e0d37a 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
@@ -16,34 +16,54 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.connect.core;
+package org.apache.pulsar.functions.sink;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.connect.core.Sink;
 
 /**
- * Pulsar's Sink interface. Sink read data from
- * a Pulsar topic and write it to external sinks(kv store, database, 
filesystem ,etc)
- * The lifcycle of a Sink is to open it passing any config needed
- * by it to initialize(like open network connection, authenticate, etc).
- * On every message from the designated PulsarTopic, the write method is
- * invoked which writes the message to the external sink. One can use close
- * at the end of the session to do any cleanup
+ * The default implementation of runtime sink.
+ *
+ * @param <T>
  */
-public interface Sink<T> extends AutoCloseable {
+public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
+
+    public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) {
+        return new DefaultRuntimeSink<>(sink);
+    }
+
+    private final Sink<T> sink;
+
+    private DefaultRuntimeSink(Sink<T> sink) {
+        this.sink = sink;
+    }
+
     /**
      * Open connector with configuration
      *
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, String> config) throws Exception;
+    @Override
+    public void open(final Map<String, Object> config) throws Exception {
+        sink.open(config);
+    }
 
     /**
      * Attempt to publish a type safe collection of messages
      *
-     * @param message Object to publish to the sink
+     * @param value output value
      * @return Completable future fo async publish request
      */
-    CompletableFuture<Void> write(final Message<T> message);
-}
\ No newline at end of file
+    @Override
+    public CompletableFuture<Void> write(T value) {
+        return sink.write(value);
+    }
+
+    @Override
+    public void close() throws Exception {
+        sink.close();
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
new file mode 100644
index 0000000..63a48ec
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.connect.core.Sink;
+
+/**
+ * This class extends connect sink.
+ *
+ * <p>Runtime should interact sink rather than interact directly to the public 
{@link Sink} interface.
+ *
+ * <p>There is a default implementation provided for wrapping up the user 
provided {@link Sink}. Pulsar sink
+ * should be implemented using this interface to ensure supporting 
effective-once.
+ */
+public interface RuntimeSink<T> extends Sink<T> {
+
+    /**
+     * Write the <tt>value</tt>value.
+     *
+     * <p>The implementation of this class is responsible for notifying the 
runtime whether the input record
+     * for generating this value is done with processing by {@link 
RecordContext#ack} and {@link RecordContext#fail}.
+     *
+     * @param inputRecordContext input record context
+     * @param value output value computed from the runtime.
+     */
+    default void write(RecordContext inputRecordContext, T value) {
+        write(value)
+            .thenAccept(ignored -> inputRecordContext.ack())
+            .exceptionally(cause -> {
+                inputRecordContext.fail();
+                return null;
+            });
+    }
+
+}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
new file mode 100644
index 0000000..7c58c30
--- /dev/null
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.connect.core.Sink;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link DefaultRuntimeSink}.
+ */
+public class DefaultRuntimeSinkTest {
+
+    private Sink<String> mockSink;
+    private RuntimeSink<String> runtimeSink;
+
+    @BeforeMethod
+    public void setup() {
+        this.mockSink = mock(Sink.class);
+        this.runtimeSink = DefaultRuntimeSink.of(mockSink);
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        this.runtimeSink.open(Collections.emptyMap());
+
+        verify(mockSink, times(1)).open(any(Map.class));
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        this.runtimeSink.close();
+
+        verify(mockSink, times(1)).close();
+    }
+
+    @Test
+    public void testWrite() {
+        this.runtimeSink.write("test-record");
+        verify(mockSink, times(1)).write(eq("test-record"));
+    }
+
+    @Test
+    public void testWriteAck() {
+        RecordContext context = mock(RecordContext.class);
+
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        writeFuture.complete(null);
+        when(mockSink.write(anyString())).thenReturn(writeFuture);
+
+        runtimeSink.write(context, "test-record");
+
+        verify(context, times(1)).ack();
+    }
+
+    @Test
+    public void testWriteFail() {
+        RecordContext context = mock(RecordContext.class);
+
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        writeFuture.completeExceptionally(new Exception("test-exception"));
+        when(mockSink.write(anyString())).thenReturn(writeFuture);
+
+        runtimeSink.write(context, "test-record");
+
+        verify(context, times(1)).fail();
+    }
+}
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index e4bde12..94a2c50 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -49,6 +49,10 @@
         </exclusion>
         <exclusion>
           <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-connect-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
           <artifactId>pulsar-client-original</artifactId>
         </exclusion>
         <exclusion>

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to