sijie closed pull request #1606: Enhance Pulsar Connect api by attaching more 
information about the message
URL: https://github.com/apache/incubator-pulsar/pull/1606
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
index c6a34baf2..73faf053a 100644
--- 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
@@ -32,6 +32,7 @@
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,10 +78,10 @@ public void close() throws Exception {
     }
 
     @Override
-    public CompletableFuture<Void> write(KeyValue<K, V> tuple) {
+    public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), tuple.getKey().toString());
-        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(tuple.getValue()));
+        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), tuple.getData().getKey().toString());
+        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(tuple.getData().getValue()));
         AWriteListener listener = null;
         try {
             listener = queue.take();
diff --git 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
index 1ed43a8a0..a003d8f2f 100644
--- 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
+++ 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
@@ -23,6 +23,7 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,8 +67,8 @@ public void close() throws Exception {
     }
 
     @Override
-    public CompletableFuture<Void> write(KeyValue<K, V> tuple) {
-        BoundStatement bound = statement.bind(tuple.getKey(), 
tuple.getValue());
+    public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) {
+        BoundStatement bound = statement.bind(tuple.getData().getKey(), 
tuple.getData().getValue());
         ResultSetFuture future = session.executeAsync(bound);
         CompletableFuture<Void> completable = new CompletableFuture<Void>();
         Futures.addCallback(future,
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java
new file mode 100644
index 000000000..26543435a
--- /dev/null
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+/**
+ * Pulsar Connect's Message interface. Message encapsulates the
+ * information about a message being read/written from/to a Source/Sink.
+ */
+public interface Message<T> {
+    /**
+     * Retrieves the partition information if any of the message
+     * @return The partition id where the
+     */
+    default String getPartitionId() { return null; }
+
+    /**
+     * Retrieves the sequence id of the message
+     * @return Sequence Id associated with the message
+     */
+    default Long getSequenceId() { return -1L; }
+
+    /**
+     * Retrieves the actual data of the message
+     * @return The message data
+     */
+    T getData();
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
index 65b006bf6..1da8f78b3 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -48,5 +48,5 @@
      * to pass messages whenever there is data to be pushed to Pulsar.
      * @param consumer
      */
-    void setConsumer(Function<T, CompletableFuture<Void>> consumer);
+    void setConsumer(Function<Message<T>, CompletableFuture<Void>> consumer);
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index e22eb0f20..f7d1b7b64 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -45,5 +45,5 @@
      * @param message Object to publish to the sink
      * @return Completable future fo async publish request
      */
-    CompletableFuture<Void> write(final T message);
+    CompletableFuture<Void> write(final Message<T> message);
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
index 1f243099a..18919f4af 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,8 +48,8 @@
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
-    public CompletableFuture<Void> write(KeyValue<K, V> message) {
-        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), 
message.getValue());
+    public CompletableFuture<Void> write(Message<KeyValue<K, V>> message) {
+        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getData().getKey(), 
message.getData().getValue());
         LOG.debug("Message sending to kafka, record={}.", record);
         Future f = producer.send(record);
         return CompletableFuture.supplyAsync(() -> {
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
index dd7066fff..f9bb8c1b0 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
 
-    private java.util.function.Function<V, CompletableFuture<Void>> 
consumeFunction;
+    private java.util.function.Function<Message<V>, CompletableFuture<Void>> 
consumeFunction;
 
     @Override
     public void open(Map<String, String> config) throws Exception {
@@ -101,7 +102,7 @@ public void start() {
                 int index = 0;
                 for (ConsumerRecord<String, V> record : records) {
                     LOG.debug("Message received from kafka, key: {}. value: 
{}", record.key(), record.value());
-                    futures[index] = consumeFunction.apply(record.value());
+                    futures[index] = consumeFunction.apply(new 
KafkaMesssage<>(record));
                     index++;
                 }
                 if (!kafkaSourceConfig.isAutoCommitEnabled()) {
@@ -120,7 +121,30 @@ public void start() {
     }
 
     @Override
-    public void setConsumer(java.util.function.Function<V, 
CompletableFuture<Void>> consumeFunction) {
+    public void setConsumer(java.util.function.Function<Message<V>, 
CompletableFuture<Void>> consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
+
+    static private class KafkaMesssage<V> implements Message<V>  {
+        ConsumerRecord<String, V> record;
+
+        public KafkaMesssage(ConsumerRecord<String, V> record) {
+            this.record = record;
+
+        }
+        @Override
+        public String getPartitionId() {
+            return Integer.toString(record.partition());
+        }
+
+        @Override
+        public Long getSequenceId() {
+            return record.offset();
+        }
+
+        @Override
+        public V getData() {
+            return record.value();
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
index 59ce73ba7..847791a7a 100644
--- 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.connect.rabbitmq;
 
 import com.rabbitmq.client.*;
+import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,13 +37,13 @@
 
     private static Logger logger = 
LoggerFactory.getLogger(RabbitMQSource.class);
 
-    private Function<byte[], CompletableFuture<Void>> consumer;
+    private Function<Message<byte[]>, CompletableFuture<Void>> consumer;
     private Connection rabbitMQConnection;
     private Channel rabbitMQChannel;
     private RabbitMQConfig rabbitMQConfig;
 
     @Override
-    public void setConsumer(Function<byte[], CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Function<Message<byte[]>, CompletableFuture<Void>> 
consumeFunction) {
         this.consumer = consumeFunction;
     }
 
@@ -74,16 +75,29 @@ public void close() throws Exception {
     }
 
     private class RabbitMQConsumer extends DefaultConsumer {
-        private Function<byte[], CompletableFuture<Void>> consumeFunction;
+        private Function<Message<byte[]>, CompletableFuture<Void>> 
consumeFunction;
 
-        public RabbitMQConsumer(Function<byte[], CompletableFuture<Void>> 
consumeFunction, Channel channel) {
+        public RabbitMQConsumer(Function<Message<byte[]>, 
CompletableFuture<Void>> consumeFunction, Channel channel) {
             super(channel);
             this.consumeFunction = consumeFunction;
         }
 
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-            consumeFunction.apply(body);
+            consumeFunction.apply(new RabbitMQMessage(body));
+        }
+    }
+
+    static private class RabbitMQMessage implements Message<byte[]> {
+        private byte[] data;
+
+        public RabbitMQMessage(byte[] data) {
+            this.data = data;
+        }
+
+        @Override
+        public byte[] getData() {
+            return data;
         }
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
index 9f6ebbe66..3f2316e31 100644
--- 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
+import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@
 
     // ----- Runtime fields
     private Object waitObject;
-    private Function<String, CompletableFuture<Void>> consumeFunction;
+    private Function<Message<String>, CompletableFuture<Void>> consumeFunction;
 
     @Override
     public void open(Map<String, String> config) throws IOException {
@@ -67,7 +68,7 @@ public void open(Map<String, String> config) throws 
IOException {
     }
 
     @Override
-    public void setConsumer(Function<String, CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Function<Message<String>, CompletableFuture<Void>> 
consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
 
@@ -126,7 +127,7 @@ public boolean process() throws IOException, 
InterruptedException {
                             // We don't really care if the future succeeds or 
not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for 
connectors
-                            consumeFunction.apply(line);
+                            consumeFunction.apply(new TwitterMessage(line));
                         } catch (Exception e) {
                             LOG.error("Exception thrown");
                         }
@@ -164,4 +165,17 @@ private void stopThread() {
         }
     }
 
+    static private class TwitterMessage implements Message<String> {
+        private String tweet;
+
+        public TwitterMessage(String tweet) {
+            this.tweet = tweet;
+        }
+
+        @Override
+        public String getData() {
+            return tweet;
+        }
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to