[FLINK-1638] [streaming] Added connector for low level Kafka Consumer API

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccb87cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccb87cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccb87cf

Branch: refs/heads/master
Commit: 0ccb87cf76af1ea662c8209c9f083eb16c51d8e8
Parents: 452c39a
Author: mbalassi <mbala...@apache.org>
Authored: Tue Mar 3 21:14:58 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerExample.java  |   4 +-
 .../connectors/kafka/api/KafkaSource.java       |  10 +-
 .../kafka/api/simple/KafkaConsumerIterator.java | 221 +++++++++++++++++++
 .../KafkaDeserializingConsumerIterator.java     |  35 +++
 .../kafka/api/simple/MessageWithOffset.java     |  44 ++++
 .../kafka/api/simple/SimpleKafkaSource.java     |  68 ++++++
 6 files changed, 379 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 20c9bd7..587d7b1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import 
org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource;
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {
@@ -39,7 +40,8 @@ public class KafkaConsumerExample {
                @SuppressWarnings("unused")
                DataStream<String> stream1 = env
                                .addSource(
-                                               new KafkaSource<String>(host + 
":" + port, topic, new JavaDefaultStringSchema()))
+//                                             new KafkaSource<String>(host + 
":" + port, topic, new JavaDefaultStringSchema()))
+                                               new 
SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
                                .setParallelism(3)
                                .print().setParallelism(3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index f4097e0..3075608 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -52,6 +52,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
        private long zookeeperSyncTimeMillis;
        private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
+       private static final String DEFAULT_GROUP_ID = "flink-group";
 
        private volatile boolean isRunning = false;
 
@@ -67,16 +68,21 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
         * @param zookeeperSyncTimeMillis
         *            Synchronization time with zookeeper.
         */
-       public KafkaSource(String zookeeperHost, String topicId,
+       public KafkaSource(String zookeeperHost, String topicId, String groupId,
                        DeserializationSchema<OUT> deserializationSchema, long 
zookeeperSyncTimeMillis) {
                super(deserializationSchema);
                this.zookeeperHost = zookeeperHost;
-               this.groupId = "flink-group";
+               this.groupId = groupId;
                this.topicId = topicId;
                this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
        }
 
        public KafkaSource(String zookeeperHost, String topicId,
+                                          DeserializationSchema<OUT> 
deserializationSchema, long zookeeperSyncTimeMillis){
+               this(zookeeperHost, topicId, DEFAULT_GROUP_ID, 
deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+       }
+
+       public KafkaSource(String zookeeperHost, String topicId,
                        DeserializationSchema<OUT> deserializationSchema) {
                this(zookeeperHost, topicId, deserializationSchema, 
ZOOKEEPER_DEFAULT_SYNC_TIME);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
new file mode 100644
index 0000000..6a01e43
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+public class KafkaConsumerIterator {
+
+       private static final long serialVersionUID = 1L;
+
+       private List<String> hosts;
+       private String topic;
+       private int port;
+       private int partition;
+       private long readOffset;
+       private long waitOnEmptyFetch;
+       private transient SimpleConsumer consumer;
+       private List<String> replicaBrokers;
+       private String clientName;
+
+       private transient Iterator<MessageAndOffset> iter;
+       private transient FetchResponse fetchResponse;
+
+       public KafkaConsumerIterator(String host, int port, String topic, int 
partition,
+                       long waitOnEmptyFetch) {
+
+               this.hosts = new ArrayList<String>();
+               hosts.add(host);
+               this.port = port;
+
+               this.topic = topic;
+               this.partition = partition;
+               this.waitOnEmptyFetch = waitOnEmptyFetch;
+
+               replicaBrokers = new ArrayList<String>();
+       }
+
+       private void initialize() {
+               PartitionMetadata metadata;
+               do {
+                       metadata = findLeader(hosts, port, topic, partition);
+                       try {
+                               Thread.sleep(1000L);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+               } while (metadata == null);
+
+               if (metadata.leader() == null) {
+                       throw new RuntimeException("Can't find Leader for Topic 
and Partition. (at " + hosts.get(0)
+                                       + ":" + port);
+               }
+
+               String leadBroker = metadata.leader().host();
+               clientName = "Client_" + topic + "_" + partition;
+
+               consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 
1024, clientName);
+       }
+
+       public void initializeFromBeginning() {
+               initialize();
+               readOffset = getLastOffset(consumer, topic, partition,
+                               kafka.api.OffsetRequest.EarliestTime(), 
clientName);
+
+               resetFetchResponse(readOffset);
+       }
+
+       public void initializeFromCurrent() {
+               initialize();
+               readOffset = getLastOffset(consumer, topic, partition,
+                               kafka.api.OffsetRequest.LatestTime(), 
clientName);
+
+               resetFetchResponse(readOffset);
+       }
+
+       public void initializeFromOffset(long offset) {
+               initialize();
+               readOffset = offset;
+               resetFetchResponse(readOffset);
+       }
+
+       public boolean hasNext() {
+               return true;
+       }
+
+       public byte[] next() {
+               return nextWithOffset().getMessage();
+       }
+
+       private void resetFetchResponse(long offset) {
+               FetchRequest req = new 
FetchRequestBuilder().clientId(clientName)
+                               .addFetch(topic, partition, offset, 
100000).build();
+               fetchResponse = consumer.fetch(req);
+               iter = fetchResponse.messageSet(topic, partition).iterator();
+       }
+
+       public MessageWithOffset nextWithOffset() {
+
+               synchronized (fetchResponse) {
+                       while (!iter.hasNext()) {
+                               resetFetchResponse(readOffset);
+                               try {
+                                       Thread.sleep(waitOnEmptyFetch);
+                               } catch (InterruptedException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+
+                       MessageAndOffset messageAndOffset = iter.next();
+                       long currentOffset = messageAndOffset.offset();
+
+                       while (currentOffset < readOffset) {
+                               messageAndOffset = iter.next();
+                               currentOffset = messageAndOffset.offset();
+                       }
+
+                       readOffset = messageAndOffset.nextOffset();
+                       ByteBuffer payload = 
messageAndOffset.message().payload();
+
+                       byte[] bytes = new byte[payload.limit()];
+                       payload.get(bytes);
+                       return new MessageWithOffset(messageAndOffset.offset(), 
bytes);
+               }
+       }
+
+       public void reset(long offset) {
+               synchronized (fetchResponse) {
+                       readOffset = offset;
+                       resetFetchResponse(offset);
+               }
+       }
+
+       private PartitionMetadata findLeader(List<String> a_hosts, int a_port, 
String a_topic,
+                       int a_partition) {
+               PartitionMetadata returnMetaData = null;
+               loop: for (String seed : a_hosts) {
+                       SimpleConsumer consumer = null;
+                       try {
+                               consumer = new SimpleConsumer(seed, a_port, 
100000, 64 * 1024, "leaderLookup");
+                               List<String> topics = 
Collections.singletonList(a_topic);
+                               TopicMetadataRequest req = new 
TopicMetadataRequest(topics);
+                               kafka.javaapi.TopicMetadataResponse resp = 
consumer.send(req);
+
+                               List<TopicMetadata> metaData = 
resp.topicsMetadata();
+                               for (TopicMetadata item : metaData) {
+                                       for (PartitionMetadata part : 
item.partitionsMetadata()) {
+                                               if (part.partitionId() == 
a_partition) {
+                                                       returnMetaData = part;
+                                                       break loop;
+                                               }
+                                       }
+                               }
+                       } catch (Exception e) {
+                               throw new RuntimeException("Error communicating 
with Broker [" + seed
+                                               + "] to find Leader for [" + 
a_topic + ", " + a_partition + "] Reason: "
+                                               + e);
+                       } finally {
+                               if (consumer != null) {
+                                       consumer.close();
+                               }
+                       }
+               }
+               if (returnMetaData != null) {
+                       replicaBrokers.clear();
+                       for (kafka.cluster.Broker replica : 
returnMetaData.replicas()) {
+                               replicaBrokers.add(replica.host());
+                       }
+               }
+               return returnMetaData;
+       }
+
+       private static long getLastOffset(SimpleConsumer consumer, String 
topic, int partition,
+                       long whichTime, String clientName) {
+               TopicAndPartition topicAndPartition = new 
TopicAndPartition(topic, partition);
+               Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo 
= new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+               requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(whichTime, 1));
+               kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo,
+                               kafka.api.OffsetRequest.CurrentVersion(), 
clientName);
+               OffsetResponse response = consumer.getOffsetsBefore(request);
+
+               if (response.hasError()) {
+                       throw new RuntimeException("Error fetching data Offset 
Data the Broker. Reason: "
+                                       + response.errorCode(topic, partition));
+               }
+               long[] offsets = response.offsets(topic, partition);
+               return offsets[0];
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
new file mode 100644
index 0000000..e1d02ef
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public class KafkaDeserializingConsumerIterator<IN> extends 
KafkaConsumerIterator {
+
+       private DeserializationSchema<IN> deserializationSchema;
+
+       public KafkaDeserializingConsumerIterator(String host, int port, String 
topic, int partition, long waitOnEmptyFetch, DeserializationSchema<IN> 
deserializationSchema) {
+               super(host, port, topic, partition, waitOnEmptyFetch);
+               this.deserializationSchema = deserializationSchema;
+       }
+
+       public IN nextRecord() {
+               return deserializationSchema.deserialize(next());
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
new file mode 100644
index 0000000..6b8f4dd
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+public class MessageWithOffset {
+       private long offset;
+       private byte[] message;
+
+       public MessageWithOffset(long offset, byte[] message) {
+               this.offset = offset;
+               this.message = message;
+       }
+
+       public long getOffset() {
+               return offset;
+       }
+
+       public void setOffset(long offset) {
+               this.offset = offset;
+       }
+
+       public byte[] getMessage() {
+               return message;
+       }
+
+       public void setMessage(byte[] message) {
+               this.message = message;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
new file mode 100644
index 0000000..db75571
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.util.Collector;
+
+public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
+       private static final long serialVersionUID = 1L;
+
+       private String topicId;
+       private final String host;
+       private final int port;
+       private KafkaConsumerIterator iterator;
+
+       /**
+        * Partition index is set automatically by instance id.
+        * @param topicId
+        * @param host
+        * @param port
+        */
+       public SimpleKafkaSource(String topicId,
+                                                        String host, int port, 
DeserializationSchema<OUT> deserializationSchema) {
+               super(deserializationSchema);
+               this.topicId = topicId;
+               this.host = host;
+               this.port = port;
+       }
+
+       private void initializeConnection() {
+               int partitionIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+               iterator = new KafkaConsumerIterator(host, port, topicId, 0, 
100L);
+               iterator.initializeFromCurrent();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void invoke(Collector<OUT> collector) throws Exception {
+               while (iterator.hasNext()) {
+                       MessageWithOffset msg = iterator.nextWithOffset();
+                       OUT out = schema.deserialize(msg.getMessage());
+                       collector.collect(out);
+               }
+       }
+
+       @Override
+       public void open(Configuration config) {
+               initializeConnection();
+       }
+}

Reply via email to