[FLINK-1638] [streaming] Kafka low level API example, documentation and fixes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed5ba95d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed5ba95d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed5ba95d

Branch: refs/heads/master
Commit: ed5ba95dee0c3aa4b8767313369778e7afce5155
Parents: 5327d56
Author: mbalassi <mbala...@apache.org>
Authored: Thu Mar 5 17:34:51 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerExample.java  |  17 ++-
 .../kafka/KafkaSimpleConsumerExample.java       |  63 +++++++++
 .../connectors/kafka/api/KafkaSource.java       |   2 +-
 .../kafka/api/simple/KafkaConsumerIterator.java | 128 ++++++++++++++++---
 .../KafkaDeserializingConsumerIterator.java     |   5 +-
 .../kafka/api/simple/KafkaTopicFactory.java     |   3 +
 .../kafka/api/simple/MessageWithOffset.java     |   3 +
 .../kafka/api/simple/PersistentKafkaSource.java |  29 ++---
 .../kafka/api/simple/SimpleKafkaSource.java     |  37 +++---
 9 files changed, 216 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 0a0c623..d9bb7d3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,7 +20,11 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+<<<<<<< HEAD
 import 
org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
+=======
+>>>>>>> a62796a... s
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {
@@ -37,15 +41,10 @@ public class KafkaConsumerExample {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
 
-               @SuppressWarnings("unused")
-               DataStream<String> stream1 = env
-                               .addSource(
-//                                             new KafkaSource<String>(host + 
":" + port, topic, new JavaDefaultStringSchema()))
-//                                             new 
SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
-                                               new 
PersistentKafkaSource<String>(topic, host, port, 10L, new 
JavaDefaultStringSchema()))
-                               .registerState("kafka", new 
OperatorState<Long>(null))
-                               .setParallelism(3)
-                               .print().setParallelism(3);
+               DataStream<String> kafkaStream = env
+                               .addSource(new KafkaSource<String>(host + ":" + 
port, topic, new JavaDefaultStringSchema()));
+
+               kafkaStream.print();
 
                env.execute();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
new file mode 100644
index 0000000..b8d4d2c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+
+public class KafkaSimpleConsumerExample {
+
+       private static String host;
+       private static int port;
+       private static String topic;
+       private static int partition;
+       private static long offset;
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+
+               DataStream<String> kafkaStream = env
+                               .addSource(new 
PersistentKafkaSource<String>(topic, host, port, partition, offset, new 
JavaDefaultStringSchema()));
+
+               kafkaStream.print();
+
+               env.execute();
+       }
+
+       private static boolean parseParameters(String[] args) {
+               if (args.length == 4) {
+                       host = args[0];
+                       port = Integer.parseInt(args[1]);
+                       topic = args[2];
+                       partition = Integer.parseInt(args[3]);
+                       offset = Long.parseLong(args[4]);
+                       return true;
+               } else {
+                       System.err.println("Usage: KafkaConsumerExample <host> 
<port> <topic> <partition> <offset>");
+                       return false;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 0c6cd4a..1baaba7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
 /**
- * Source that listens to a Kafka topic.
+ * Source that listens to a Kafka topic using the high level Kafka API.
  * 
  * @param <OUT>
  *            Type of the messages on the topic.

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
index 6a01e43..92d351a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
@@ -36,11 +36,18 @@ import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Iterates the records received from a partition of a Kafka topic as byte 
arrays.
+ */
 public class KafkaConsumerIterator {
 
        private static final long serialVersionUID = 1L;
 
+       private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L;
+
        private List<String> hosts;
        private String topic;
        private int port;
@@ -54,11 +61,21 @@ public class KafkaConsumerIterator {
        private transient Iterator<MessageAndOffset> iter;
        private transient FetchResponse fetchResponse;
 
-       public KafkaConsumerIterator(String host, int port, String topic, int 
partition,
+       /**
+        * Constructor with configurable wait time on empty fetch. For 
connecting to the Kafka service
+        * we use the so called simple or low level Kafka API thus directly 
connecting to one of the brokers.
+        *
+        * @param hostName Hostname of a known Kafka broker
+        * @param port Port of the known Kafka broker
+        * @param topic Name of the topic to listen to
+        * @param partition Partition in the chosen topic
+        * @param waitOnEmptyFetch wait time on empty fetch in millis
+        */
+       public KafkaConsumerIterator(String hostName, int port, String topic, 
int partition,
                        long waitOnEmptyFetch) {
 
                this.hosts = new ArrayList<String>();
-               hosts.add(host);
+               hosts.add(hostName);
                this.port = port;
 
                this.topic = topic;
@@ -68,14 +85,37 @@ public class KafkaConsumerIterator {
                replicaBrokers = new ArrayList<String>();
        }
 
-       private void initialize() {
+       /**
+        * Constructor without configurable wait time on empty fetch. For 
connecting to the Kafka service
+        * we use the so called simple or low level Kafka API thus directly 
connecting to one of the brokers.
+        *
+        * @param hostName Hostname of a known Kafka broker
+        * @param port Port of the known Kafka broker
+        * @param topic Name of the topic to listen to
+        * @param partition Partition in the chosen topic
+        */
+       public KafkaConsumerIterator(String hostName, int port, String topic, 
int partition){
+               this(hostName, port, topic, partition, 
DEFAULT_WAIT_ON_EMPTY_FETCH);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Initializing a connection
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Initializes the connection by detecting the leading broker of
+        * the topic and establishing a connection to it.
+        */
+       private void initialize() throws InterruptedException {
                PartitionMetadata metadata;
                do {
                        metadata = findLeader(hosts, port, topic, partition);
-                       try {
-                               Thread.sleep(1000L);
-                       } catch (InterruptedException e) {
-                               e.printStackTrace();
+                       if (metadata == null) {
+                               try {
+                                       Thread.sleep(waitOnEmptyFetch);
+                               } catch (InterruptedException e) {
+                                       throw new 
InterruptedException("Establishing connection to Kafka failed");
+                               }
                        }
                } while (metadata == null);
 
@@ -90,7 +130,10 @@ public class KafkaConsumerIterator {
                consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 
1024, clientName);
        }
 
-       public void initializeFromBeginning() {
+       /**
+        * Initializes a connection from the earliest available offset.
+        */
+       public void initializeFromBeginning() throws InterruptedException {
                initialize();
                readOffset = getLastOffset(consumer, topic, partition,
                                kafka.api.OffsetRequest.EarliestTime(), 
clientName);
@@ -98,7 +141,10 @@ public class KafkaConsumerIterator {
                resetFetchResponse(readOffset);
        }
 
-       public void initializeFromCurrent() {
+       /**
+        * Initializes a connection from the latest available offset.
+        */
+       public void initializeFromCurrent() throws InterruptedException {
                initialize();
                readOffset = getLastOffset(consumer, topic, partition,
                                kafka.api.OffsetRequest.LatestTime(), 
clientName);
@@ -106,28 +152,48 @@ public class KafkaConsumerIterator {
                resetFetchResponse(readOffset);
        }
 
-       public void initializeFromOffset(long offset) {
+       /**
+        * Initializes a connection from the specified offset.
+        *
+        * @param offset Desired Kafka offset
+        */
+       public void initializeFromOffset(long offset) throws 
InterruptedException {
                initialize();
                readOffset = offset;
                resetFetchResponse(readOffset);
        }
 
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Iterator methods
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Convenience method to emulate iterator behaviour.
+        *
+        * @return whether the iterator has a next element
+        */
        public boolean hasNext() {
                return true;
        }
 
-       public byte[] next() {
+       /**
+        * Returns the next message received from Kafka as a
+        * byte array.
+        *
+        * @return next message as a byte array.
+        */
+       public byte[] next() throws InterruptedException {
                return nextWithOffset().getMessage();
        }
 
-       private void resetFetchResponse(long offset) {
-               FetchRequest req = new 
FetchRequestBuilder().clientId(clientName)
-                               .addFetch(topic, partition, offset, 
100000).build();
-               fetchResponse = consumer.fetch(req);
-               iter = fetchResponse.messageSet(topic, partition).iterator();
-       }
-
-       public MessageWithOffset nextWithOffset() {
+       /**
+        * Returns the next message and its offset received from
+        * Kafka encapsulated in a POJO.
+        *
+        * @return next message and its offset.
+        */
+       public MessageWithOffset nextWithOffset() throws InterruptedException {
 
                synchronized (fetchResponse) {
                        while (!iter.hasNext()) {
@@ -135,7 +201,7 @@ public class KafkaConsumerIterator {
                                try {
                                        Thread.sleep(waitOnEmptyFetch);
                                } catch (InterruptedException e) {
-                                       e.printStackTrace();
+                                       throw new 
InterruptedException("Fetching from Kafka was interrupted");
                                }
                        }
 
@@ -152,10 +218,16 @@ public class KafkaConsumerIterator {
 
                        byte[] bytes = new byte[payload.limit()];
                        payload.get(bytes);
+
                        return new MessageWithOffset(messageAndOffset.offset(), 
bytes);
                }
        }
 
+       /**
+        * Resets the iterator to a given offset.
+        *
+        * @param offset Desired Kafka offset.
+        */
        public void reset(long offset) {
                synchronized (fetchResponse) {
                        readOffset = offset;
@@ -163,6 +235,20 @@ public class KafkaConsumerIterator {
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Internal utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       private void resetFetchResponse(long offset) {
+               FetchRequest req = new 
FetchRequestBuilder().clientId(clientName)
+                               .addFetch(topic, partition, offset, 
100000).build();
+               fetchResponse = consumer.fetch(req);
+
+               //TODO deal with broker failures
+
+               iter = fetchResponse.messageSet(topic, partition).iterator();
+       }
+
        private PartitionMetadata findLeader(List<String> a_hosts, int a_port, 
String a_topic,
                        int a_partition) {
                PartitionMetadata returnMetaData = null;
@@ -212,7 +298,7 @@ public class KafkaConsumerIterator {
                OffsetResponse response = consumer.getOffsetsBefore(request);
 
                if (response.hasError()) {
-                       throw new RuntimeException("Error fetching data Offset 
Data the Broker. Reason: "
+                       throw new RuntimeException("Error fetching data from 
Kafka broker. Reason: "
                                        + response.errorCode(topic, partition));
                }
                long[] offsets = response.offsets(topic, partition);

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
index e1d02ef..6ca4c81 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
@@ -23,12 +23,13 @@ public class KafkaDeserializingConsumerIterator<IN> extends 
KafkaConsumerIterato
 
        private DeserializationSchema<IN> deserializationSchema;
 
-       public KafkaDeserializingConsumerIterator(String host, int port, String 
topic, int partition, long waitOnEmptyFetch, DeserializationSchema<IN> 
deserializationSchema) {
+       public KafkaDeserializingConsumerIterator(String host, int port, String 
topic, int partition, long waitOnEmptyFetch,
+                                                                               
                DeserializationSchema<IN> deserializationSchema) {
                super(host, port, topic, partition, waitOnEmptyFetch);
                this.deserializationSchema = deserializationSchema;
        }
 
-       public IN nextRecord() {
+       public IN nextRecord() throws InterruptedException {
                return deserializationSchema.deserialize(next());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
index f949b9a..9e6dea7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
@@ -26,6 +26,9 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
 
 import kafka.admin.AdminUtils;
 
+/**
+ * Factory for creating custom Kafka partitions.
+ */
 public class KafkaTopicFactory {
 
        public static void createTopic(String zookeeperServer, String 
topicName, int numOfPartitions, int replicationFactor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
index 6b8f4dd..c5b8e32 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.simple;
 
+/**
+ * POJO encapsulating records received from Kafka with their offset.
+ */
 public class MessageWithOffset {
        private long offset;
        private byte[] message;

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 5f15d12..fd428c0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -23,6 +23,14 @@ import 
org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
+/**
+ * Kafka source persisting its offset through the {@link OperatorState} 
interface.
+ * This allows the offset to be restored to the latest one that has been 
acknowledged
+ * by the whole execution graph.
+ *
+ * @param <OUT>
+ *            Type of the messages on the topic.
+ */
 public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 
        private static final long serialVersionUID = 1L;
@@ -31,22 +39,14 @@ public class PersistentKafkaSource<OUT> extends 
SimpleKafkaSource<OUT> {
 
        private transient OperatorState<Long> kafkaOffSet;
 
-       /**
-        * Partition index is set automatically by instance id.
-        * 
-        * @param topicId
-        * @param host
-        * @param port
-        * @param deserializationSchema
-        */
-       public PersistentKafkaSource(String topicId, String host, int port, 
long initialOffset,
+       public PersistentKafkaSource(String topicId, String host, int port, int 
partition, long initialOffset,
                        DeserializationSchema<OUT> deserializationSchema) {
-               super(topicId, host, port, deserializationSchema);
+               super(topicId, host, port, partition, deserializationSchema);
                this.initialOffset = initialOffset;
        }
 
        @Override
-       public void open(Configuration parameters) {
+       public void open(Configuration parameters) throws InterruptedException {
                StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
                @SuppressWarnings("unchecked")
                OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) 
context.getState("kafka");
@@ -62,21 +62,16 @@ public class PersistentKafkaSource<OUT> extends 
SimpleKafkaSource<OUT> {
        }
 
        @Override
-       protected void setInitialOffset(Configuration config) {
+       protected void setInitialOffset(Configuration config) throws 
InterruptedException{
                iterator.initializeFromOffset(kafkaOffSet.getState());
        }
 
-       @Override
-       protected void gotMessage(MessageWithOffset msg) {
-               System.out.println(msg.getOffset() + " :: " + 
schema.deserialize(msg.getMessage()));
-       }
 
        @Override
        public void run(Collector<OUT> collector) throws Exception {
                MessageWithOffset msg;
                while (iterator.hasNext()) {
                        msg = iterator.nextWithOffset();
-                       gotMessage(msg);
                        OUT out = schema.deserialize(msg.getMessage());
                        collector.collect(out);
                        kafkaOffSet.update(msg.getOffset());

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index fa925b3..61fd173 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -23,47 +23,42 @@ import 
org.apache.flink.streaming.connectors.ConnectorSource;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
+/**
+ * Source that listens to a Kafka topic using the low level or simple Kafka 
API.
+ *
+ * @param <OUT>
+ *            Type of the messages on the topic.
+ */
 public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
        private static final long serialVersionUID = 1L;
 
        private String topicId;
-       private final String host;
+       private final String hostName;
        private final int port;
+       private final int partition;
        protected KafkaConsumerIterator iterator;
 
-       /**
-        * Partition index is set automatically by instance id.
-        * @param topicId
-        * @param host
-        * @param port
-        * @param deserializationSchema
-        */
-       public SimpleKafkaSource(String topicId, String host, int port, 
DeserializationSchema<OUT> deserializationSchema) {
+       public SimpleKafkaSource(String topic, String hostName, int port, int 
partition,
+                                                               
DeserializationSchema<OUT> deserializationSchema) {
                super(deserializationSchema);
-               this.topicId = topicId;
-               this.host = host;
+               this.topicId = topic;
+               this.hostName = hostName;
                this.port = port;
+               this.partition = partition;
        }
 
        private void initializeConnection() {
-               //TODO: Fix this
-               int partitionIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-               iterator = new KafkaConsumerIterator(host, port, topicId, 0, 
100L);
+               iterator = new KafkaConsumerIterator(hostName, port, topicId, 
partition);
        }
 
-       protected void setInitialOffset(Configuration config) {
+       protected void setInitialOffset(Configuration config) throws 
InterruptedException {
                iterator.initializeFromCurrent();
        }
 
-       //This just for debug purposes
-       protected void gotMessage(MessageWithOffset msg) {
-       }
-
        @Override
        public void run(Collector<OUT> collector) throws Exception {
                while (iterator.hasNext()) {
                        MessageWithOffset msg = iterator.nextWithOffset();
-                       gotMessage(msg);
                        OUT out = schema.deserialize(msg.getMessage());
                        collector.collect(out);
                }
@@ -75,7 +70,7 @@ public class SimpleKafkaSource<OUT> extends 
ConnectorSource<OUT> {
 
 
        @Override
-       public void open(Configuration config) {
+       public void open(Configuration config) throws InterruptedException {
                initializeConnection();
                setInitialOffset(config);
        }

Reply via email to