[FLINK-1638] [streaming] Added connector for low level Kafka Consumer API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccb87cf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccb87cf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccb87cf Branch: refs/heads/master Commit: 0ccb87cf76af1ea662c8209c9f083eb16c51d8e8 Parents: 452c39a Author: mbalassi <mbala...@apache.org> Authored: Tue Mar 3 21:14:58 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../connectors/kafka/KafkaConsumerExample.java | 4 +- .../connectors/kafka/api/KafkaSource.java | 10 +- .../kafka/api/simple/KafkaConsumerIterator.java | 221 +++++++++++++++++++ .../KafkaDeserializingConsumerIterator.java | 35 +++ .../kafka/api/simple/MessageWithOffset.java | 44 ++++ .../kafka/api/simple/SimpleKafkaSource.java | 68 ++++++ 6 files changed, 379 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index 20c9bd7..587d7b1 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; +import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource; import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; public class KafkaConsumerExample { @@ -39,7 +40,8 @@ public class KafkaConsumerExample { @SuppressWarnings("unused") DataStream<String> stream1 = env .addSource( - new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())) +// new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())) + new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema())) .setParallelism(3) .print().setParallelism(3); http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index f4097e0..3075608 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -52,6 +52,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> { private long zookeeperSyncTimeMillis; private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200; + private static final String DEFAULT_GROUP_ID = "flink-group"; private volatile boolean isRunning = false; @@ -67,16 +68,21 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> { * @param zookeeperSyncTimeMillis * Synchronization time with zookeeper. */ - public KafkaSource(String zookeeperHost, String topicId, + public KafkaSource(String zookeeperHost, String topicId, String groupId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) { super(deserializationSchema); this.zookeeperHost = zookeeperHost; - this.groupId = "flink-group"; + this.groupId = groupId; this.topicId = topicId; this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis; } public KafkaSource(String zookeeperHost, String topicId, + DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){ + this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME); + } + + public KafkaSource(String zookeeperHost, String topicId, DeserializationSchema<OUT> deserializationSchema) { this(zookeeperHost, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java new file mode 100644 index 0000000..6a01e43 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +public class KafkaConsumerIterator { + + private static final long serialVersionUID = 1L; + + private List<String> hosts; + private String topic; + private int port; + private int partition; + private long readOffset; + private long waitOnEmptyFetch; + private transient SimpleConsumer consumer; + private List<String> replicaBrokers; + private String clientName; + + private transient Iterator<MessageAndOffset> iter; + private transient FetchResponse fetchResponse; + + public KafkaConsumerIterator(String host, int port, String topic, int partition, + long waitOnEmptyFetch) { + + this.hosts = new ArrayList<String>(); + hosts.add(host); + this.port = port; + + this.topic = topic; + this.partition = partition; + this.waitOnEmptyFetch = waitOnEmptyFetch; + + replicaBrokers = new ArrayList<String>(); + } + + private void initialize() { + PartitionMetadata metadata; + do { + metadata = findLeader(hosts, port, topic, partition); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } while (metadata == null); + + if (metadata.leader() == null) { + throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0) + + ":" + port); + } + + String leadBroker = metadata.leader().host(); + clientName = "Client_" + topic + "_" + partition; + + consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); + } + + public void initializeFromBeginning() { + initialize(); + readOffset = getLastOffset(consumer, topic, partition, + kafka.api.OffsetRequest.EarliestTime(), clientName); + + resetFetchResponse(readOffset); + } + + public void initializeFromCurrent() { + initialize(); + readOffset = getLastOffset(consumer, topic, partition, + kafka.api.OffsetRequest.LatestTime(), clientName); + + resetFetchResponse(readOffset); + } + + public void initializeFromOffset(long offset) { + initialize(); + readOffset = offset; + resetFetchResponse(readOffset); + } + + public boolean hasNext() { + return true; + } + + public byte[] next() { + return nextWithOffset().getMessage(); + } + + private void resetFetchResponse(long offset) { + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(topic, partition, offset, 100000).build(); + fetchResponse = consumer.fetch(req); + iter = fetchResponse.messageSet(topic, partition).iterator(); + } + + public MessageWithOffset nextWithOffset() { + + synchronized (fetchResponse) { + while (!iter.hasNext()) { + resetFetchResponse(readOffset); + try { + Thread.sleep(waitOnEmptyFetch); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + MessageAndOffset messageAndOffset = iter.next(); + long currentOffset = messageAndOffset.offset(); + + while (currentOffset < readOffset) { + messageAndOffset = iter.next(); + currentOffset = messageAndOffset.offset(); + } + + readOffset = messageAndOffset.nextOffset(); + ByteBuffer payload = messageAndOffset.message().payload(); + + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + return new MessageWithOffset(messageAndOffset.offset(), bytes); + } + } + + public void reset(long offset) { + synchronized (fetchResponse) { + readOffset = offset; + resetFetchResponse(offset); + } + } + + private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic, + int a_partition) { + PartitionMetadata returnMetaData = null; + loop: for (String seed : a_hosts) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); + List<String> topics = Collections.singletonList(a_topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + List<TopicMetadata> metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == a_partition) { + returnMetaData = part; + break loop; + } + } + } + } catch (Exception e) { + throw new RuntimeException("Error communicating with Broker [" + seed + + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + + e); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } + if (returnMetaData != null) { + replicaBrokers.clear(); + for (kafka.cluster.Broker replica : returnMetaData.replicas()) { + replicaBrokers.add(replica.host()); + } + } + return returnMetaData; + } + + private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, + long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, + kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + throw new RuntimeException("Error fetching data Offset Data the Broker. Reason: " + + response.errorCode(topic, partition)); + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java new file mode 100644 index 0000000..e1d02ef --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import org.apache.flink.streaming.connectors.util.DeserializationSchema; + +public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterator { + + private DeserializationSchema<IN> deserializationSchema; + + public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, DeserializationSchema<IN> deserializationSchema) { + super(host, port, topic, partition, waitOnEmptyFetch); + this.deserializationSchema = deserializationSchema; + } + + public IN nextRecord() { + return deserializationSchema.deserialize(next()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java new file mode 100644 index 0000000..6b8f4dd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +public class MessageWithOffset { + private long offset; + private byte[] message; + + public MessageWithOffset(long offset, byte[] message) { + this.offset = offset; + this.message = message; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public byte[] getMessage() { + return message; + } + + public void setMessage(byte[] message) { + this.message = message; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java new file mode 100644 index 0000000..db75571 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.ConnectorSource; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; +import org.apache.flink.util.Collector; + +public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> { + private static final long serialVersionUID = 1L; + + private String topicId; + private final String host; + private final int port; + private KafkaConsumerIterator iterator; + + /** + * Partition index is set automatically by instance id. + * @param topicId + * @param host + * @param port + */ + public SimpleKafkaSource(String topicId, + String host, int port, DeserializationSchema<OUT> deserializationSchema) { + super(deserializationSchema); + this.topicId = topicId; + this.host = host; + this.port = port; + } + + private void initializeConnection() { + int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L); + iterator.initializeFromCurrent(); + } + + @SuppressWarnings("unchecked") + @Override + public void invoke(Collector<OUT> collector) throws Exception { + while (iterator.hasNext()) { + MessageWithOffset msg = iterator.nextWithOffset(); + OUT out = schema.deserialize(msg.getMessage()); + collector.collect(out); + } + } + + @Override + public void open(Configuration config) { + initializeConnection(); + } +}