[FLINK-1638] [streaming] Kafka low level API example, documentation and fixes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed5ba95d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed5ba95d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed5ba95d Branch: refs/heads/master Commit: ed5ba95dee0c3aa4b8767313369778e7afce5155 Parents: 5327d56 Author: mbalassi <mbala...@apache.org> Authored: Thu Mar 5 17:34:51 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Mar 10 14:58:49 2015 +0100 ---------------------------------------------------------------------- .../connectors/kafka/KafkaConsumerExample.java | 17 ++- .../kafka/KafkaSimpleConsumerExample.java | 63 +++++++++ .../connectors/kafka/api/KafkaSource.java | 2 +- .../kafka/api/simple/KafkaConsumerIterator.java | 128 ++++++++++++++++--- .../KafkaDeserializingConsumerIterator.java | 5 +- .../kafka/api/simple/KafkaTopicFactory.java | 3 + .../kafka/api/simple/MessageWithOffset.java | 3 + .../kafka/api/simple/PersistentKafkaSource.java | 29 ++--- .../kafka/api/simple/SimpleKafkaSource.java | 37 +++--- 9 files changed, 216 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index 0a0c623..d9bb7d3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -20,7 +20,11 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.runtime.state.OperatorState; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; +<<<<<<< HEAD import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; +======= +>>>>>>> a62796a... s import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; public class KafkaConsumerExample { @@ -37,15 +41,10 @@ public class KafkaConsumerExample { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4); - @SuppressWarnings("unused") - DataStream<String> stream1 = env - .addSource( -// new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())) -// new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema())) - new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema())) - .registerState("kafka", new OperatorState<Long>(null)) - .setParallelism(3) - .print().setParallelism(3); + DataStream<String> kafkaStream = env + .addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema())); + + kafkaStream.print(); env.execute(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java new file mode 100644 index 0000000..b8d4d2c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; +import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; + +public class KafkaSimpleConsumerExample { + + private static String host; + private static int port; + private static String topic; + private static int partition; + private static long offset; + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4); + + DataStream<String> kafkaStream = env + .addSource(new PersistentKafkaSource<String>(topic, host, port, partition, offset, new JavaDefaultStringSchema())); + + kafkaStream.print(); + + env.execute(); + } + + private static boolean parseParameters(String[] args) { + if (args.length == 4) { + host = args[0]; + port = Integer.parseInt(args[1]); + topic = args[2]; + partition = Integer.parseInt(args[3]); + offset = Long.parseLong(args[4]); + return true; + } else { + System.err.println("Usage: KafkaConsumerExample <host> <port> <topic> <partition> <offset>"); + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 0c6cd4a..1baaba7 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -35,7 +35,7 @@ import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; /** - * Source that listens to a Kafka topic. + * Source that listens to a Kafka topic using the high level Kafka API. * * @param <OUT> * Type of the messages on the topic. http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java index 6a01e43..92d351a 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java @@ -36,11 +36,18 @@ import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Iterates the records received from a partition of a Kafka topic as byte arrays. + */ public class KafkaConsumerIterator { private static final long serialVersionUID = 1L; + private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L; + private List<String> hosts; private String topic; private int port; @@ -54,11 +61,21 @@ public class KafkaConsumerIterator { private transient Iterator<MessageAndOffset> iter; private transient FetchResponse fetchResponse; - public KafkaConsumerIterator(String host, int port, String topic, int partition, + /** + * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service + * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers. + * + * @param hostName Hostname of a known Kafka broker + * @param port Port of the known Kafka broker + * @param topic Name of the topic to listen to + * @param partition Partition in the chosen topic + * @param waitOnEmptyFetch wait time on empty fetch in millis + */ + public KafkaConsumerIterator(String hostName, int port, String topic, int partition, long waitOnEmptyFetch) { this.hosts = new ArrayList<String>(); - hosts.add(host); + hosts.add(hostName); this.port = port; this.topic = topic; @@ -68,14 +85,37 @@ public class KafkaConsumerIterator { replicaBrokers = new ArrayList<String>(); } - private void initialize() { + /** + * Constructor without configurable wait time on empty fetch. For connecting to the Kafka service + * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers. + * + * @param hostName Hostname of a known Kafka broker + * @param port Port of the known Kafka broker + * @param topic Name of the topic to listen to + * @param partition Partition in the chosen topic + */ + public KafkaConsumerIterator(String hostName, int port, String topic, int partition){ + this(hostName, port, topic, partition, DEFAULT_WAIT_ON_EMPTY_FETCH); + } + + // -------------------------------------------------------------------------------------------- + // Initializing a connection + // -------------------------------------------------------------------------------------------- + + /** + * Initializes the connection by detecting the leading broker of + * the topic and establishing a connection to it. + */ + private void initialize() throws InterruptedException { PartitionMetadata metadata; do { metadata = findLeader(hosts, port, topic, partition); - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - e.printStackTrace(); + if (metadata == null) { + try { + Thread.sleep(waitOnEmptyFetch); + } catch (InterruptedException e) { + throw new InterruptedException("Establishing connection to Kafka failed"); + } } } while (metadata == null); @@ -90,7 +130,10 @@ public class KafkaConsumerIterator { consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); } - public void initializeFromBeginning() { + /** + * Initializes a connection from the earliest available offset. + */ + public void initializeFromBeginning() throws InterruptedException { initialize(); readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); @@ -98,7 +141,10 @@ public class KafkaConsumerIterator { resetFetchResponse(readOffset); } - public void initializeFromCurrent() { + /** + * Initializes a connection from the latest available offset. + */ + public void initializeFromCurrent() throws InterruptedException { initialize(); readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); @@ -106,28 +152,48 @@ public class KafkaConsumerIterator { resetFetchResponse(readOffset); } - public void initializeFromOffset(long offset) { + /** + * Initializes a connection from the specified offset. + * + * @param offset Desired Kafka offset + */ + public void initializeFromOffset(long offset) throws InterruptedException { initialize(); readOffset = offset; resetFetchResponse(readOffset); } + + // -------------------------------------------------------------------------------------------- + // Iterator methods + // -------------------------------------------------------------------------------------------- + + /** + * Convenience method to emulate iterator behaviour. + * + * @return whether the iterator has a next element + */ public boolean hasNext() { return true; } - public byte[] next() { + /** + * Returns the next message received from Kafka as a + * byte array. + * + * @return next message as a byte array. + */ + public byte[] next() throws InterruptedException { return nextWithOffset().getMessage(); } - private void resetFetchResponse(long offset) { - FetchRequest req = new FetchRequestBuilder().clientId(clientName) - .addFetch(topic, partition, offset, 100000).build(); - fetchResponse = consumer.fetch(req); - iter = fetchResponse.messageSet(topic, partition).iterator(); - } - - public MessageWithOffset nextWithOffset() { + /** + * Returns the next message and its offset received from + * Kafka encapsulated in a POJO. + * + * @return next message and its offset. + */ + public MessageWithOffset nextWithOffset() throws InterruptedException { synchronized (fetchResponse) { while (!iter.hasNext()) { @@ -135,7 +201,7 @@ public class KafkaConsumerIterator { try { Thread.sleep(waitOnEmptyFetch); } catch (InterruptedException e) { - e.printStackTrace(); + throw new InterruptedException("Fetching from Kafka was interrupted"); } } @@ -152,10 +218,16 @@ public class KafkaConsumerIterator { byte[] bytes = new byte[payload.limit()]; payload.get(bytes); + return new MessageWithOffset(messageAndOffset.offset(), bytes); } } + /** + * Resets the iterator to a given offset. + * + * @param offset Desired Kafka offset. + */ public void reset(long offset) { synchronized (fetchResponse) { readOffset = offset; @@ -163,6 +235,20 @@ public class KafkaConsumerIterator { } } + // -------------------------------------------------------------------------------------------- + // Internal utilities + // -------------------------------------------------------------------------------------------- + + private void resetFetchResponse(long offset) { + FetchRequest req = new FetchRequestBuilder().clientId(clientName) + .addFetch(topic, partition, offset, 100000).build(); + fetchResponse = consumer.fetch(req); + + //TODO deal with broker failures + + iter = fetchResponse.messageSet(topic, partition).iterator(); + } + private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; @@ -212,7 +298,7 @@ public class KafkaConsumerIterator { OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { - throw new RuntimeException("Error fetching data Offset Data the Broker. Reason: " + throw new RuntimeException("Error fetching data from Kafka broker. Reason: " + response.errorCode(topic, partition)); } long[] offsets = response.offsets(topic, partition); http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java index e1d02ef..6ca4c81 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java @@ -23,12 +23,13 @@ public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterato private DeserializationSchema<IN> deserializationSchema; - public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, DeserializationSchema<IN> deserializationSchema) { + public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition, long waitOnEmptyFetch, + DeserializationSchema<IN> deserializationSchema) { super(host, port, topic, partition, waitOnEmptyFetch); this.deserializationSchema = deserializationSchema; } - public IN nextRecord() { + public IN nextRecord() throws InterruptedException { return deserializationSchema.deserialize(next()); } http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java index f949b9a..9e6dea7 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java @@ -26,6 +26,9 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; +/** + * Factory for creating custom Kafka partitions. + */ public class KafkaTopicFactory { public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) { http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java index 6b8f4dd..c5b8e32 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.connectors.kafka.api.simple; +/** + * POJO encapsulating records received from Kafka with their offset. + */ public class MessageWithOffset { private long offset; private byte[] message; http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java index 5f15d12..fd428c0 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -23,6 +23,14 @@ import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; +/** + * Kafka source persisting its offset through the {@link OperatorState} interface. + * This allows the offset to be restored to the latest one that has been acknowledged + * by the whole execution graph. + * + * @param <OUT> + * Type of the messages on the topic. + */ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> { private static final long serialVersionUID = 1L; @@ -31,22 +39,14 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> { private transient OperatorState<Long> kafkaOffSet; - /** - * Partition index is set automatically by instance id. - * - * @param topicId - * @param host - * @param port - * @param deserializationSchema - */ - public PersistentKafkaSource(String topicId, String host, int port, long initialOffset, + public PersistentKafkaSource(String topicId, String host, int port, int partition, long initialOffset, DeserializationSchema<OUT> deserializationSchema) { - super(topicId, host, port, deserializationSchema); + super(topicId, host, port, partition, deserializationSchema); this.initialOffset = initialOffset; } @Override - public void open(Configuration parameters) { + public void open(Configuration parameters) throws InterruptedException { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); @SuppressWarnings("unchecked") OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka"); @@ -62,21 +62,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> { } @Override - protected void setInitialOffset(Configuration config) { + protected void setInitialOffset(Configuration config) throws InterruptedException{ iterator.initializeFromOffset(kafkaOffSet.getState()); } - @Override - protected void gotMessage(MessageWithOffset msg) { - System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage())); - } @Override public void run(Collector<OUT> collector) throws Exception { MessageWithOffset msg; while (iterator.hasNext()) { msg = iterator.nextWithOffset(); - gotMessage(msg); OUT out = schema.deserialize(msg.getMessage()); collector.collect(out); kafkaOffSet.update(msg.getOffset()); http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java index fa925b3..61fd173 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java @@ -23,47 +23,42 @@ import org.apache.flink.streaming.connectors.ConnectorSource; import org.apache.flink.streaming.connectors.util.DeserializationSchema; import org.apache.flink.util.Collector; +/** + * Source that listens to a Kafka topic using the low level or simple Kafka API. + * + * @param <OUT> + * Type of the messages on the topic. + */ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> { private static final long serialVersionUID = 1L; private String topicId; - private final String host; + private final String hostName; private final int port; + private final int partition; protected KafkaConsumerIterator iterator; - /** - * Partition index is set automatically by instance id. - * @param topicId - * @param host - * @param port - * @param deserializationSchema - */ - public SimpleKafkaSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) { + public SimpleKafkaSource(String topic, String hostName, int port, int partition, + DeserializationSchema<OUT> deserializationSchema) { super(deserializationSchema); - this.topicId = topicId; - this.host = host; + this.topicId = topic; + this.hostName = hostName; this.port = port; + this.partition = partition; } private void initializeConnection() { - //TODO: Fix this - int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); - iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L); + iterator = new KafkaConsumerIterator(hostName, port, topicId, partition); } - protected void setInitialOffset(Configuration config) { + protected void setInitialOffset(Configuration config) throws InterruptedException { iterator.initializeFromCurrent(); } - //This just for debug purposes - protected void gotMessage(MessageWithOffset msg) { - } - @Override public void run(Collector<OUT> collector) throws Exception { while (iterator.hasNext()) { MessageWithOffset msg = iterator.nextWithOffset(); - gotMessage(msg); OUT out = schema.deserialize(msg.getMessage()); collector.collect(out); } @@ -75,7 +70,7 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> { @Override - public void open(Configuration config) { + public void open(Configuration config) throws InterruptedException { initializeConnection(); setInitialOffset(config); }