This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1fad36b Rename Connect `Message` interface to `Record` (#1636) 1fad36b is described below commit 1fad36bb81f772d7851167b4d2170d388547a620 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Tue Apr 24 17:17:44 2018 -0700 Rename Connect `Message` interface to `Record` (#1636) * Rename Connect `Message` interface to `Record` *Motivation* Having two different `Message` interfaces is a bit confusing and also introduced unnecessary object allocation. *Solution* - rename connect `Message` interface to `Record` interface - introduce an abstract base implementation for both `api.Message` and `connect.Record` - change the places to the new class * Add the missing class file * - change Map<String, String> to Map<String, Object> for all open calls - add a `RuntimeSink` in functions which extends Sink and provides the flexibility to control acking behavior for pulsar sink * record sequence should be part of RecordContext --- pulsar-client-admin-shaded-for-functions/pom.xml | 7 ++ pulsar-client-admin-shaded/pom.xml | 7 ++ pulsar-client-shaded/pom.xml | 7 ++ pulsar-client/pom.xml | 6 ++ .../org/apache/pulsar/client/impl/MessageImpl.java | 3 +- .../pulsar/client/impl/MessageRecordImpl.java | 65 +++++++++++++++ .../pulsar/client/impl/TopicMessageImpl.java | 9 +- .../pulsar/client/impl/MessageRecordImplTest.java | 60 ++++++++++++++ .../pulsar/connect/aerospike/AerospikeSink.java | 9 +- .../connect/aerospike/AerospikeSinkConfig.java | 2 +- .../pulsar/connect/cassandra/CassandraSink.java | 7 +- .../connect/cassandra/CassandraSinkConfig.java | 2 +- .../org/apache/pulsar/connect/core/PushSource.java | 4 +- .../connect/core/{Source.java => Record.java} | 24 ++---- .../core/{Message.java => RecordContext.java} | 27 +++--- .../java/org/apache/pulsar/connect/core/Sink.java | 6 +- .../org/apache/pulsar/connect/core/Source.java | 2 +- .../org/apache/pulsar/connect/kafka/KafkaSink.java | 9 +- .../pulsar/connect/kafka/KafkaSinkConfig.java | 2 +- .../apache/pulsar/connect/kafka/KafkaSource.java | 22 ++--- .../pulsar/connect/kafka/KafkaSourceConfig.java | 2 +- .../pulsar/connect/rabbitmq/RabbitMQConfig.java | 2 +- .../pulsar/connect/rabbitmq/RabbitMQSource.java | 22 ++--- .../pulsar/connect/twitter/TwitterFireHose.java | 16 ++-- .../connect/twitter/TwitterFireHoseConfig.java | 2 +- .../pulsar/functions/sink/DefaultRuntimeSink.java | 46 ++++++++--- .../apache/pulsar/functions/sink/RuntimeSink.java | 52 ++++++++++++ .../functions/sink/DefaultRuntimeSinkTest.java | 96 ++++++++++++++++++++++ pulsar-functions/utils/pom.xml | 4 + 29 files changed, 414 insertions(+), 108 deletions(-) diff --git a/pulsar-client-admin-shaded-for-functions/pom.xml b/pulsar-client-admin-shaded-for-functions/pom.xml index 5953b75..1e38333 100644 --- a/pulsar-client-admin-shaded-for-functions/pom.xml +++ b/pulsar-client-admin-shaded-for-functions/pom.xml @@ -61,6 +61,7 @@ <includes> <include>org.apache.pulsar:pulsar-common</include> <include>org.apache.bookkeeper:circe-checksum</include> + <include>org.apache.pulsar:pulsar-connect-core</include> <include>org.apache.pulsar:pulsar-client-original</include> <include>org.apache.pulsar:pulsar-client-admin-original</include> <!-- client dependencies as below --> @@ -107,6 +108,12 @@ </includes> </filter> <filter> + <artifact>org.apache.pulsar:pulsar-connect-core</artifact> + <includes> + <include>**</include> + </includes> + </filter> + <filter> <artifact>org.apache.pulsar:pulsar-client-original</artifact> <includes> <include>**</include> diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index fcd9349..df5f867 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -57,6 +57,7 @@ <artifactSet> <includes> + <include>org.apache.pulsar:pulsar-connect-core</include> <include>org.apache.pulsar:pulsar-client-original</include> <include>org.apache.pulsar:pulsar-client-admin-original</include> <include>org.apache.commons:commons-lang3</include> @@ -93,6 +94,12 @@ </includes> </filter> <filter> + <artifact>org.apache.pulsar:pulsar-connect-core</artifact> + <includes> + <include>**</include> + </includes> + </filter> + <filter> <artifact>org.apache.pulsar:pulsar-client-original</artifact> <includes> <include>**</include> diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index c67c546..14ace60 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -59,6 +59,7 @@ <artifactSet> <includes> + <include>org.apache.pulsar:pulsar-connect-core</include> <include>org.apache.pulsar:pulsar-client-original</include> <include>org.apache.commons:commons-lang3</include> <include>commons-codec:commons-codec</include> @@ -94,6 +95,12 @@ </includes> </filter> <filter> + <artifact>org.apache.pulsar:pulsar-connect-core</artifact> + <includes> + <include>**</include> + </includes> + </filter> + <filter> <artifact>org.apache.pulsar:pulsar-client-original</artifact> <includes> <include>**</include> diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 267c5e7..f6b5b40 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -40,6 +40,12 @@ </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-connect-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index f9ba65c..2989d30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -43,10 +43,9 @@ import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; -public class MessageImpl<T> implements Message<T> { +public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> { private MessageMetadata.Builder msgMetadataBuilder; - private MessageId messageId; private ClientCnx cnx; private ByteBuf payload; private Schema<T> schema; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java new file mode 100644 index 0000000..5e8128a --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.connect.core.Record; + +/** + * Abstract class that implements message api and connect record api. + */ +public abstract class MessageRecordImpl<T, M extends MessageId> implements Message<T>, Record<T> { + + protected M messageId; + protected java.util.function.Consumer<M> ackFunction; + + public void setAckFunction(java.util.function.Consumer<M> ackFunction) { + this.ackFunction = ackFunction; + } + + @Override + public String getPartitionId() { + if (null != messageId) { + if (messageId instanceof MessageIdImpl) { + return String.valueOf(((MessageIdImpl) messageId).getPartitionIndex()); + } else { + return ""; + } + } + return ""; + } + + @Override + public long getRecordSequence() { + return getSequenceId(); + } + + @Override + public void ack() { + if (null != ackFunction) { + ackFunction.accept((M) getMessageId()); + } + } + + @Override + public void fail() { + // no-op + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 5ae452e..18e2088 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -23,17 +23,16 @@ import java.util.Map; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -public class TopicMessageImpl<T> implements Message<T> { +public class TopicMessageImpl<T> extends MessageRecordImpl<T, TopicMessageIdImpl> { private final String topicName; private final Message<T> msg; - private final TopicMessageIdImpl msgId; TopicMessageImpl(String topicName, Message<T> msg) { this.topicName = topicName; this.msg = msg; - this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId()); } /** @@ -46,11 +45,11 @@ public class TopicMessageImpl<T> implements Message<T> { @Override public MessageId getMessageId() { - return msgId; + return messageId; } public MessageId getInnerMessageId() { - return msgId.getInnerMessageId(); + return messageId.getInnerMessageId(); } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java new file mode 100644 index 0000000..7df373a --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertSame; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.client.api.MessageId; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class MessageRecordImplTest { + + private MessageRecordImpl<byte[], MessageId> message; + private MessageId messageId; + + @BeforeMethod + public void setup() { + this.messageId = mock(MessageId.class); + this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS); + when(message.getMessageId()).thenReturn(messageId); + } + + @Test + public void testAck() throws Exception { + final AtomicReference<MessageId> msgIdRef = new AtomicReference<>(); + final CountDownLatch ackLatch = new CountDownLatch(1); + + this.message.setAckFunction(msgId -> { + msgIdRef.set(msgId); + ackLatch.countDown(); + }); + + this.message.ack(); + ackLatch.await(); + + assertSame(messageId, msgIdRef.get()); + } + +} diff --git a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java index 73faf05..daab766 100644 --- a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java +++ b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java @@ -32,7 +32,6 @@ import com.aerospike.client.listener.WriteListener; import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.policy.WritePolicy; import org.apache.pulsar.common.util.KeyValue; -import org.apache.pulsar.connect.core.Message; import org.apache.pulsar.connect.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +51,7 @@ public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> { private EventLoop eventLoop; @Override - public void open(Map<String, String> config) throws Exception { + public void open(Map<String, Object> config) throws Exception { aerospikeSinkConfig = AerospikeSinkConfig.load(config); if (aerospikeSinkConfig.getSeedHosts() == null || aerospikeSinkConfig.getKeyspace() == null @@ -78,10 +77,10 @@ public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> { } @Override - public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) { + public CompletableFuture<Void> write(KeyValue<K, V> record) { CompletableFuture<Void> future = new CompletableFuture<>(); - Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), tuple.getData().getKey().toString()); - Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(tuple.getData().getValue())); + Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), record.getKey().toString()); + Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(record.getValue())); AWriteListener listener = null; try { listener = queue.take(); diff --git a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java index 72f530b..ef02c80 100644 --- a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java +++ b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java @@ -57,7 +57,7 @@ public class AerospikeSinkConfig implements Serializable { return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class); } - public static AerospikeSinkConfig load(Map<String, String> map) throws IOException { + public static AerospikeSinkConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), AerospikeSinkConfig.class); } diff --git a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java index a003d8f..bc87ec6 100644 --- a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java +++ b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java @@ -23,7 +23,6 @@ import com.datastax.driver.core.*; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.apache.pulsar.common.util.KeyValue; -import org.apache.pulsar.connect.core.Message; import org.apache.pulsar.connect.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,7 @@ public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> { private PreparedStatement statement; @Override - public void open(Map<String, String> config) throws Exception { + public void open(Map<String, Object> config) throws Exception { cassandraSinkConfig = CassandraSinkConfig.load(config); if (cassandraSinkConfig.getRoots() == null || cassandraSinkConfig.getKeyspace() == null @@ -67,8 +66,8 @@ public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> { } @Override - public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) { - BoundStatement bound = statement.bind(tuple.getData().getKey(), tuple.getData().getValue()); + public CompletableFuture<Void> write(KeyValue<K, V> record) { + BoundStatement bound = statement.bind(record.getKey(), record.getValue()); ResultSetFuture future = session.executeAsync(bound); CompletableFuture<Void> completable = new CompletableFuture<Void>(); Futures.addCallback(future, diff --git a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java index b21c95e..5bcfb52 100644 --- a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java +++ b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java @@ -50,7 +50,7 @@ public class CassandraSinkConfig implements Serializable { return mapper.readValue(new File(yamlFile), CassandraSinkConfig.class); } - public static CassandraSinkConfig load(Map<String, String> map) throws IOException { + public static CassandraSinkConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), CassandraSinkConfig.class); } diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java index 1da8f78..4e6f64b 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java @@ -41,12 +41,12 @@ public interface PushSource<T> extends AutoCloseable { * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ - void open(final Map<String, String> config) throws Exception; + void open(final Map<String, Object> config) throws Exception; /** * Attach a consumer function to this Source. This is invoked by the implementation * to pass messages whenever there is data to be pushed to Pulsar. * @param consumer */ - void setConsumer(Function<Message<T>, CompletableFuture<Void>> consumer); + void setConsumer(Function<Record<T>, CompletableFuture<Void>> consumer); } \ No newline at end of file diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java similarity index 57% copy from pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java copy to pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java index b63a4b7..c5137ea 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java @@ -18,22 +18,14 @@ */ package org.apache.pulsar.connect.core; -import java.util.Map; - -public interface Source<T> extends AutoCloseable { - /** - * Open connector with configuration - * - * @param config initialization config - * @throws Exception IO type exceptions when opening a connector - */ - void open(final Map<String, String> config) throws Exception; - +/** + * Pulsar Connect's Record interface. Record encapsulates the + * information about a record being read from a Source. + */ +public interface Record<T> extends RecordContext { /** - * Reads the next message from source, if one exists, and returns. This call should be non-blocking. - * If source does not have any new messages, return null immediately. - * @return next message from source or null, if no new messages are available. - * @throws Exception + * Retrieves the actual data of the record + * @return The record data */ - Message<T> read() throws Exception; + T getValue(); } diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java similarity index 60% rename from pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java rename to pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java index 1fcdb25..094ca8c 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java @@ -19,35 +19,30 @@ package org.apache.pulsar.connect.core; /** - * Pulsar Connect's Message interface. Message encapsulates the - * information about a message being read/written from/to a Source/Sink. + * A source context that can be used by the runtime to interact with source. */ -public interface Message<T> { +public interface RecordContext { + /** - * Retrieves the partition information if any of the message + * Retrieves the partition information if any of the record. * @return The partition id where the */ default String getPartitionId() { return null; } /** - * Retrieves the sequence id of the message - * @return Sequence Id associated with the message + * Retrieves the sequence of the record from a source partition. + * @return Sequence Id associated with the record */ - default Long getSequenceId() { return -1L; } + default long getRecordSequence() { return -1L; } /** - * Retrieves the actual data of the message - * @return The message data + * Acknowledge that this record is fully processed */ - T getData(); + default void ack() {} /** - * Acknowledge that this message is fully processed + * To indicate that this record has failed to be processed */ - default void ack() {}; + default void fail() {} - /** - * To indicate that this message has failed to be processed - */ - default void fail() {}; } diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java index f7d1b7b..ca569e7 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java @@ -37,13 +37,13 @@ public interface Sink<T> extends AutoCloseable { * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ - void open(final Map<String, String> config) throws Exception; + void open(final Map<String, Object> config) throws Exception; /** * Attempt to publish a type safe collection of messages * - * @param message Object to publish to the sink + * @param value output value * @return Completable future fo async publish request */ - CompletableFuture<Void> write(final Message<T> message); + CompletableFuture<Void> write(T value); } \ No newline at end of file diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java index b63a4b7..40f1820 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java @@ -35,5 +35,5 @@ public interface Source<T> extends AutoCloseable { * @return next message from source or null, if no new messages are available. * @throws Exception */ - Message<T> read() throws Exception; + Record<T> read() throws Exception; } diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java index 18919f4..fc8e2af 100644 --- a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pulsar.common.util.KeyValue; -import org.apache.pulsar.connect.core.Message; import org.apache.pulsar.connect.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +47,9 @@ public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> { private KafkaSinkConfig kafkaSinkConfig; @Override - public CompletableFuture<Void> write(Message<KeyValue<K, V>> message) { - ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getData().getKey(), message.getData().getValue()); - LOG.debug("Message sending to kafka, record={}.", record); + public CompletableFuture<Void> write(KeyValue<K, V> message) { + ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), message.getValue()); + LOG.debug("Record sending to kafka, record={}.", record); Future f = producer.send(record); return CompletableFuture.supplyAsync(() -> { try { @@ -69,7 +68,7 @@ public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> { } @Override - public void open(Map<String, String> config) throws Exception { + public void open(Map<String, Object> config) throws Exception { kafkaSinkConfig = KafkaSinkConfig.load(config); if (kafkaSinkConfig.getTopic() == null || kafkaSinkConfig.getBootstrapServers() == null diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java index 45aea78..6da494e 100644 --- a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java @@ -52,7 +52,7 @@ public class KafkaSinkConfig implements Serializable { return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class); } - public static KafkaSinkConfig load(Map<String, String> map) throws IOException { + public static KafkaSinkConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSinkConfig.class); } diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java index f9bb8c1..7520ee6 100644 --- a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.pulsar.connect.core.Message; +import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.connect.core.PushSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +45,10 @@ public class KafkaSource<V> implements PushSource<V> { private KafkaSourceConfig kafkaSourceConfig; Thread runnerThread; - private java.util.function.Function<Message<V>, CompletableFuture<Void>> consumeFunction; + private java.util.function.Function<Record<V>, CompletableFuture<Void>> consumeFunction; @Override - public void open(Map<String, String> config) throws Exception { + public void open(Map<String, Object> config) throws Exception { kafkaSourceConfig = KafkaSourceConfig.load(config); if (kafkaSourceConfig.getTopic() == null || kafkaSourceConfig.getBootstrapServers() == null @@ -101,8 +101,8 @@ public class KafkaSource<V> implements PushSource<V> { CompletableFuture<?>[] futures = new CompletableFuture<?>[records.count()]; int index = 0; for (ConsumerRecord<String, V> record : records) { - LOG.debug("Message received from kafka, key: {}. value: {}", record.key(), record.value()); - futures[index] = consumeFunction.apply(new KafkaMesssage<>(record)); + LOG.debug("Record received from kafka, key: {}. value: {}", record.key(), record.value()); + futures[index] = consumeFunction.apply(new KafkaRecord<>(record)); index++; } if (!kafkaSourceConfig.isAutoCommitEnabled()) { @@ -121,14 +121,14 @@ public class KafkaSource<V> implements PushSource<V> { } @Override - public void setConsumer(java.util.function.Function<Message<V>, CompletableFuture<Void>> consumeFunction) { + public void setConsumer(java.util.function.Function<Record<V>, CompletableFuture<Void>> consumeFunction) { this.consumeFunction = consumeFunction; } - static private class KafkaMesssage<V> implements Message<V> { - ConsumerRecord<String, V> record; + static private class KafkaRecord<V> implements Record<V> { + private final ConsumerRecord<String, V> record; - public KafkaMesssage(ConsumerRecord<String, V> record) { + public KafkaRecord(ConsumerRecord<String, V> record) { this.record = record; } @@ -138,12 +138,12 @@ public class KafkaSource<V> implements PushSource<V> { } @Override - public Long getSequenceId() { + public long getRecordSequence() { return record.offset(); } @Override - public V getData() { + public V getValue() { return record.value(); } } diff --git a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java index 77fd77b..0d41b1b 100644 --- a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java +++ b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java @@ -54,7 +54,7 @@ public class KafkaSourceConfig implements Serializable { return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class); } - public static KafkaSourceConfig load(Map<String, String> map) throws IOException { + public static KafkaSourceConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSourceConfig.class); } diff --git a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java index 1d7268e..e76b03f 100644 --- a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java +++ b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java @@ -48,7 +48,7 @@ public class RabbitMQConfig implements Serializable { return mapper.readValue(new File(yamlFile), RabbitMQConfig.class); } - public static RabbitMQConfig load(Map<String, String> map) throws IOException { + public static RabbitMQConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQConfig.class); } diff --git a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java index 847791a..c548f99 100644 --- a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java +++ b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java @@ -20,7 +20,7 @@ package org.apache.pulsar.connect.rabbitmq; import com.rabbitmq.client.*; -import org.apache.pulsar.connect.core.Message; +import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.connect.core.PushSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,18 +37,18 @@ public class RabbitMQSource implements PushSource<byte[]> { private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class); - private Function<Message<byte[]>, CompletableFuture<Void>> consumer; + private Function<Record<byte[]>, CompletableFuture<Void>> consumer; private Connection rabbitMQConnection; private Channel rabbitMQChannel; private RabbitMQConfig rabbitMQConfig; @Override - public void setConsumer(Function<Message<byte[]>, CompletableFuture<Void>> consumeFunction) { + public void setConsumer(Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction) { this.consumer = consumeFunction; } @Override - public void open(Map<String, String> config) throws Exception { + public void open(Map<String, Object> config) throws Exception { rabbitMQConfig = RabbitMQConfig.load(config); if (rabbitMQConfig.getAmqUri() == null || rabbitMQConfig.getQueueName() == null) { @@ -75,28 +75,28 @@ public class RabbitMQSource implements PushSource<byte[]> { } private class RabbitMQConsumer extends DefaultConsumer { - private Function<Message<byte[]>, CompletableFuture<Void>> consumeFunction; + private Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction; - public RabbitMQConsumer(Function<Message<byte[]>, CompletableFuture<Void>> consumeFunction, Channel channel) { + public RabbitMQConsumer(Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction, Channel channel) { super(channel); this.consumeFunction = consumeFunction; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - consumeFunction.apply(new RabbitMQMessage(body)); + consumeFunction.apply(new RabbitMQRecord(body)); } } - static private class RabbitMQMessage implements Message<byte[]> { - private byte[] data; + static private class RabbitMQRecord implements Record<byte[]> { + private final byte[] data; - public RabbitMQMessage(byte[] data) { + public RabbitMQRecord(byte[] data) { this.data = data; } @Override - public byte[] getData() { + public byte[] getValue() { return data; } } diff --git a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java index 3f2316e..1dcbb17 100644 --- a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java +++ b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import org.apache.pulsar.connect.core.Message; +import org.apache.pulsar.connect.core.Record; import org.apache.pulsar.connect.core.PushSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,10 +52,10 @@ public class TwitterFireHose implements PushSource<String> { // ----- Runtime fields private Object waitObject; - private Function<Message<String>, CompletableFuture<Void>> consumeFunction; + private Function<Record<String>, CompletableFuture<Void>> consumeFunction; @Override - public void open(Map<String, String> config) throws IOException { + public void open(Map<String, Object> config) throws IOException { TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config); if (hoseConfig.getConsumerKey() == null || hoseConfig.getConsumerSecret() == null @@ -68,7 +68,7 @@ public class TwitterFireHose implements PushSource<String> { } @Override - public void setConsumer(Function<Message<String>, CompletableFuture<Void>> consumeFunction) { + public void setConsumer(Function<Record<String>, CompletableFuture<Void>> consumeFunction) { this.consumeFunction = consumeFunction; } @@ -127,7 +127,7 @@ public class TwitterFireHose implements PushSource<String> { // We don't really care if the future succeeds or not. // However might be in the future to count failures // TODO:- Figure out the metrics story for connectors - consumeFunction.apply(new TwitterMessage(line)); + consumeFunction.apply(new TwitterRecord(line)); } catch (Exception e) { LOG.error("Exception thrown"); } @@ -165,15 +165,15 @@ public class TwitterFireHose implements PushSource<String> { } } - static private class TwitterMessage implements Message<String> { + static private class TwitterRecord implements Record<String> { private String tweet; - public TwitterMessage(String tweet) { + public TwitterRecord(String tweet) { this.tweet = tweet; } @Override - public String getData() { + public String getValue() { return tweet; } } diff --git a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java index f0614bd..57782e2 100644 --- a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java +++ b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java @@ -56,7 +56,7 @@ public class TwitterFireHoseConfig implements Serializable { return mapper.readValue(new File(yamlFile), TwitterFireHoseConfig.class); } - public static TwitterFireHoseConfig load(Map<String, String> map) throws IOException { + public static TwitterFireHoseConfig load(Map<String, Object> map) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(new ObjectMapper().writeValueAsString(map), TwitterFireHoseConfig.class); } diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java similarity index 58% copy from pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java copy to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java index f7d1b7b..8e0d37a 100644 --- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java @@ -16,34 +16,54 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.connect.core; +package org.apache.pulsar.functions.sink; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.connect.core.Sink; /** - * Pulsar's Sink interface. Sink read data from - * a Pulsar topic and write it to external sinks(kv store, database, filesystem ,etc) - * The lifcycle of a Sink is to open it passing any config needed - * by it to initialize(like open network connection, authenticate, etc). - * On every message from the designated PulsarTopic, the write method is - * invoked which writes the message to the external sink. One can use close - * at the end of the session to do any cleanup + * The default implementation of runtime sink. + * + * @param <T> */ -public interface Sink<T> extends AutoCloseable { +public class DefaultRuntimeSink<T> implements RuntimeSink<T> { + + public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) { + return new DefaultRuntimeSink<>(sink); + } + + private final Sink<T> sink; + + private DefaultRuntimeSink(Sink<T> sink) { + this.sink = sink; + } + /** * Open connector with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ - void open(final Map<String, String> config) throws Exception; + @Override + public void open(final Map<String, Object> config) throws Exception { + sink.open(config); + } /** * Attempt to publish a type safe collection of messages * - * @param message Object to publish to the sink + * @param value output value * @return Completable future fo async publish request */ - CompletableFuture<Void> write(final Message<T> message); -} \ No newline at end of file + @Override + public CompletableFuture<Void> write(T value) { + return sink.write(value); + } + + @Override + public void close() throws Exception { + sink.close(); + } +} diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java new file mode 100644 index 0000000..63a48ec --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.sink; + +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.connect.core.Sink; + +/** + * This class extends connect sink. + * + * <p>Runtime should interact sink rather than interact directly to the public {@link Sink} interface. + * + * <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink + * should be implemented using this interface to ensure supporting effective-once. + */ +public interface RuntimeSink<T> extends Sink<T> { + + /** + * Write the <tt>value</tt>value. + * + * <p>The implementation of this class is responsible for notifying the runtime whether the input record + * for generating this value is done with processing by {@link RecordContext#ack} and {@link RecordContext#fail}. + * + * @param inputRecordContext input record context + * @param value output value computed from the runtime. + */ + default void write(RecordContext inputRecordContext, T value) { + write(value) + .thenAccept(ignored -> inputRecordContext.ack()) + .exceptionally(cause -> { + inputRecordContext.fail(); + return null; + }); + } + +} diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java new file mode 100644 index 0000000..7c58c30 --- /dev/null +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.sink; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.connect.core.RecordContext; +import org.apache.pulsar.connect.core.Sink; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test {@link DefaultRuntimeSink}. + */ +public class DefaultRuntimeSinkTest { + + private Sink<String> mockSink; + private RuntimeSink<String> runtimeSink; + + @BeforeMethod + public void setup() { + this.mockSink = mock(Sink.class); + this.runtimeSink = DefaultRuntimeSink.of(mockSink); + } + + @Test + public void testOpen() throws Exception { + this.runtimeSink.open(Collections.emptyMap()); + + verify(mockSink, times(1)).open(any(Map.class)); + } + + @Test + public void testClose() throws Exception { + this.runtimeSink.close(); + + verify(mockSink, times(1)).close(); + } + + @Test + public void testWrite() { + this.runtimeSink.write("test-record"); + verify(mockSink, times(1)).write(eq("test-record")); + } + + @Test + public void testWriteAck() { + RecordContext context = mock(RecordContext.class); + + CompletableFuture<Void> writeFuture = new CompletableFuture<>(); + writeFuture.complete(null); + when(mockSink.write(anyString())).thenReturn(writeFuture); + + runtimeSink.write(context, "test-record"); + + verify(context, times(1)).ack(); + } + + @Test + public void testWriteFail() { + RecordContext context = mock(RecordContext.class); + + CompletableFuture<Void> writeFuture = new CompletableFuture<>(); + writeFuture.completeExceptionally(new Exception("test-exception")); + when(mockSink.write(anyString())).thenReturn(writeFuture); + + runtimeSink.write(context, "test-record"); + + verify(context, times(1)).fail(); + } +} diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml index e4bde12..94a2c50 100644 --- a/pulsar-functions/utils/pom.xml +++ b/pulsar-functions/utils/pom.xml @@ -49,6 +49,10 @@ </exclusion> <exclusion> <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-connect-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-original</artifactId> </exclusion> <exclusion> -- To stop receiving notification emails like this one, please contact si...@apache.org.