http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
deleted file mode 100644
index a38c3bd..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for 
the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka 
the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the 
instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we 
extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties 
Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-       public final static String SERIALIZED_WRAPPER_NAME = 
"flink.kafka.wrapper.serialized";
-
-       private Partitioner wrapped;
-       public PartitionerWrapper(VerifiableProperties properties) {
-               wrapped = (Partitioner) 
properties.props().get(SERIALIZED_WRAPPER_NAME);
-       }
-
-       @Override
-       public int partition(Object value, int numberOfPartitions) {
-               return wrapped.partition(value, numberOfPartitions);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
new file mode 100644
index 0000000..6aaeca9
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread that periodically writes the current Kafka partition offsets to 
Zookeeper.
+ */
+public class PeriodicOffsetCommitter extends Thread {
+
+       /** The ZooKeeper handler */
+       private final ZookeeperOffsetHandler offsetHandler;
+       
+       private final KafkaTopicPartitionState<?>[] partitionStates;
+       
+       /** The proxy to forward exceptions to the main thread */
+       private final ExceptionProxy errorHandler;
+       
+       /** Interval in which to commit, in milliseconds */
+       private final long commitInterval;
+       
+       /** Flag to mark the periodic committer as running */
+       private volatile boolean running = true;
+
+       PeriodicOffsetCommitter(ZookeeperOffsetHandler offsetHandler,
+                       KafkaTopicPartitionState<?>[] partitionStates,
+                       ExceptionProxy errorHandler,
+                       long commitInterval)
+       {
+               this.offsetHandler = checkNotNull(offsetHandler);
+               this.partitionStates = checkNotNull(partitionStates);
+               this.errorHandler = checkNotNull(errorHandler);
+               this.commitInterval = commitInterval;
+               
+               checkArgument(commitInterval > 0);
+       }
+
+       @Override
+       public void run() {
+               try {
+                       while (running) {
+                               Thread.sleep(commitInterval);
+
+                               // create copy a deep copy of the current 
offsets
+                               HashMap<KafkaTopicPartition, Long> 
currentOffsets = new HashMap<>(partitionStates.length);
+                               for (KafkaTopicPartitionState<?> partitionState 
: partitionStates) {
+                                       
currentOffsets.put(partitionState.getKafkaTopicPartition(), 
partitionState.getOffset());
+                               }
+                               
+                               offsetHandler.writeOffsets(currentOffsets);
+                       }
+               }
+               catch (Throwable t) {
+                       if (running) {
+                               errorHandler.reportError(
+                                               new Exception("The periodic 
offset committer encountered an error: " + t.getMessage(), t));
+                       }
+               }
+       }
+
+       public void shutdown() {
+               this.running = false;
+               this.interrupt();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
new file mode 100644
index 0000000..491ffad
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.kafka.common.Node;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig;
+
+/**
+ * This class implements a thread with a connection to a single Kafka broker. 
The thread
+ * pulls records for a set of topic partitions for which the connected broker 
is currently
+ * the leader. The thread deserializes these records and emits them. 
+ * 
+ * @param <T> The type of elements that this consumer thread creates from 
Kafka's byte messages
+ *            and emits into the Flink DataStream.
+ */
+class SimpleConsumerThread<T> extends Thread {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SimpleConsumerThread.class);
+
+       private static final KafkaTopicPartitionState<TopicAndPartition> MARKER 
= Kafka08Fetcher.MARKER;
+       
+       // 
------------------------------------------------------------------------
+
+       private final Kafka08Fetcher<T> owner;
+       
+       private final KeyedDeserializationSchema<T> deserializer;
+
+       private final List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions;
+
+       private final Node broker;
+
+       /** Queue containing new fetch partitions for the consumer thread */
+       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
newPartitionsQueue;
+       
+       private final 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions;
+       
+       private final ExceptionProxy errorHandler;
+       
+       private final long invalidOffsetBehavior;
+       
+       private volatile boolean running = true;
+       
+
+       // ----------------- Simple Consumer ----------------------
+       private volatile SimpleConsumer consumer;
+
+       private final int soTimeout;
+       private final int minBytes;
+       private final int maxWait;
+       private final int fetchSize;
+       private final int bufferSize;
+       private final int reconnectLimit;
+
+
+       // exceptions are thrown locally
+       public SimpleConsumerThread(
+                       Kafka08Fetcher<T> owner,
+                       ExceptionProxy errorHandler,
+                       Properties config,
+                       Node broker,
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
seedPartitions,
+                       
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
unassignedPartitions,
+                       KeyedDeserializationSchema<T> deserializer,
+                       long invalidOffsetBehavior)
+       {
+               this.owner = owner;
+               this.errorHandler = errorHandler;
+               this.broker = broker;
+               this.partitions = seedPartitions;
+               this.deserializer = requireNonNull(deserializer);
+               this.unassignedPartitions = 
requireNonNull(unassignedPartitions);
+               this.newPartitionsQueue = new ClosableBlockingQueue<>();
+               this.invalidOffsetBehavior = invalidOffsetBehavior;
+               
+               // these are the actual configuration values of Kafka + their 
original default values.
+               this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 
30000);
+               this.minBytes = getIntFromConfig(config, "fetch.min.bytes", 1);
+               this.maxWait = getIntFromConfig(config, "fetch.wait.max.ms", 
100);
+               this.fetchSize = getIntFromConfig(config, 
"fetch.message.max.bytes", 1048576);
+               this.bufferSize = getIntFromConfig(config, 
"socket.receive.buffer.bytes", 65536);
+               this.reconnectLimit = getIntFromConfig(config, 
"flink.simple-consumer-reconnectLimit", 3);
+       }
+
+       public 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
getNewPartitionsQueue() {
+               return newPartitionsQueue;
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  main work loop
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public void run() {
+               LOG.info("Starting to fetch from {}", this.partitions);
+
+               // set up the config values
+               final String clientId = "flink-kafka-consumer-legacy-" + 
broker.id();
+
+               try {
+                       // create the Kafka consumer that we actually use for 
fetching
+                       consumer = new SimpleConsumer(broker.host(), 
broker.port(), soTimeout, bufferSize, clientId);
+                       
+                       // make sure that all partitions have some offsets to 
start with
+                       // those partitions that do not have an offset from a 
checkpoint need to get
+                       // their start offset from ZooKeeper
+                       getMissingOffsetsFromKafka(partitions);
+
+                       // Now, the actual work starts :-)
+                       int offsetOutOfRangeCount = 0;
+                       int reconnects = 0;
+                       while (running) {
+
+                               // ----------------------------------- 
partitions list maintenance ----------------------------
+
+                               // check queue for new partitions to read from:
+                               
List<KafkaTopicPartitionState<TopicAndPartition>> newPartitions = 
newPartitionsQueue.pollBatch();
+                               if (newPartitions != null) {
+                                       // found some new partitions for this 
thread's broker
+                                       
+                                       // check if the new partitions need an 
offset lookup
+                                       
getMissingOffsetsFromKafka(newPartitions);
+                                       
+                                       // add the new partitions (and check 
they are not already in there)
+                                       for 
(KafkaTopicPartitionState<TopicAndPartition> newPartition: newPartitions) {
+                                               if 
(partitions.contains(newPartition)) {
+                                                       throw new 
IllegalStateException("Adding partition " + newPartition + 
+                                                                       " to 
subscribed partitions even though it is already subscribed");
+                                               }
+                                               partitions.add(newPartition);
+                                       }
+                                       
+                                       LOG.info("Adding {} new partitions to 
consumer thread {}", newPartitions.size(), getName());
+                                       LOG.debug("Partitions list: {}", 
newPartitions);
+                               }
+
+                               if (partitions.size() == 0) {
+                                       if (newPartitionsQueue.close()) {
+                                               // close succeeded. Closing 
thread
+                                               running = false;
+                                               
+                                               LOG.info("Consumer thread {} 
does not have any partitions assigned anymore. Stopping thread.", 
+                                                               getName());
+
+                                               // add the wake-up marker into 
the queue to make the main thread
+                                               // immediately wake up and 
termination faster
+                                               
unassignedPartitions.add(MARKER);
+
+                                               break;
+                                       } else {
+                                               // close failed: fetcher main 
thread concurrently added new partitions into the queue.
+                                               // go to top of loop again and 
get the new partitions
+                                               continue; 
+                                       }
+                               }
+
+                               // ----------------------------------- request 
/ response with kafka ----------------------------
+
+                               FetchRequestBuilder frb = new 
FetchRequestBuilder();
+                               frb.clientId(clientId);
+                               frb.maxWait(maxWait);
+                               frb.minBytes(minBytes);
+
+                               for (KafkaTopicPartitionState<?> partition : 
partitions) {
+                                       frb.addFetch(
+                                                       
partition.getKafkaTopicPartition().getTopic(),
+                                                       
partition.getKafkaTopicPartition().getPartition(),
+                                                       partition.getOffset() + 
1, // request the next record
+                                                       fetchSize);
+                               }
+                               
+                               kafka.api.FetchRequest fetchRequest = 
frb.build();
+                               LOG.debug("Issuing fetch request {}", 
fetchRequest);
+
+                               FetchResponse fetchResponse;
+                               try {
+                                       fetchResponse = 
consumer.fetch(fetchRequest);
+                               }
+                               catch (Throwable cce) {
+                                       //noinspection ConstantConditions
+                                       if (cce instanceof 
ClosedChannelException) {
+                                               LOG.warn("Fetch failed because 
of ClosedChannelException.");
+                                               LOG.debug("Full exception", 
cce);
+                                               
+                                               // we don't know if the broker 
is overloaded or unavailable.
+                                               // retry a few times, then 
return ALL partitions for new leader lookup
+                                               if (++reconnects >= 
reconnectLimit) {
+                                                       LOG.warn("Unable to 
reach broker after {} retries. Returning all current partitions", 
reconnectLimit);
+                                                       for 
(KafkaTopicPartitionState<TopicAndPartition> fp: this.partitions) {
+                                                               
unassignedPartitions.add(fp);
+                                                       }
+                                                       this.partitions.clear();
+                                                       continue; // jump to 
top of loop: will close thread or subscribe to new partitions
+                                               }
+                                               try {
+                                                       consumer.close();
+                                               } catch (Throwable t) {
+                                                       LOG.warn("Error while 
closing consumer connection", t);
+                                               }
+                                               // delay & retry
+                                               Thread.sleep(100);
+                                               consumer = new 
SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
+                                               continue; // retry
+                                       } else {
+                                               throw cce;
+                                       }
+                               }
+                               reconnects = 0;
+
+                               // ---------------------------------------- 
error handling ----------------------------
+
+                               if (fetchResponse == null) {
+                                       throw new IOException("Fetch from Kafka 
failed (request returned null)");
+                               }
+                               
+                               if (fetchResponse.hasError()) {
+                                       String exception = "";
+                                       
List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = 
new ArrayList<>();
+                                       
+                                       // iterate over partitions to get 
individual error codes
+                                       
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = 
partitions.iterator();
+                                       boolean partitionsRemoved = false;
+                                       
+                                       while (partitionsIterator.hasNext()) {
+                                               final 
KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
+                                               short code = 
fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
+
+                                               if (code == 
ErrorMapping.OffsetOutOfRangeCode()) {
+                                                       // we were asked to 
read from an out-of-range-offset (maybe set wrong in Zookeeper)
+                                                       // Kafka's high level 
consumer is resetting the offset according to 'auto.offset.reset'
+                                                       
partitionsToGetOffsetsFor.add(fp);
+                                               }
+                                               else if (code == 
ErrorMapping.NotLeaderForPartitionCode() ||
+                                                               code == 
ErrorMapping.LeaderNotAvailableCode() ||
+                                                               code == 
ErrorMapping.BrokerNotAvailableCode() ||
+                                                               code == 
ErrorMapping.UnknownCode())
+                                               {
+                                                       // the broker we are 
connected to is not the leader for the partition.
+                                                       LOG.warn("{} is not the 
leader of {}. Reassigning leader for partition", broker, fp);
+                                                       LOG.debug("Error code = 
{}", code);
+
+                                                       
unassignedPartitions.add(fp);
+
+                                                       
partitionsIterator.remove(); // unsubscribe the partition ourselves
+                                                       partitionsRemoved = 
true;
+                                               }
+                                               else if (code != 
ErrorMapping.NoError()) {
+                                                       exception += 
"\nException for " + fp.getTopic() +":"+ fp.getPartition() + ": " +
+                                                                       
StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
+                                               }
+                                       }
+                                       if (partitionsToGetOffsetsFor.size() > 
0) {
+                                               // safeguard against an 
infinite loop.
+                                               if (offsetOutOfRangeCount++ > 
3) {
+                                                       throw new 
RuntimeException("Found invalid offsets more than three times in partitions "
+                                                                       + 
partitionsToGetOffsetsFor + " Exceptions: " + exception);
+                                               }
+                                               // get valid offsets for these 
partitions and try again.
+                                               LOG.warn("The following 
partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
+                                               
getLastOffsetFromKafka(consumer, partitionsToGetOffsetsFor, 
invalidOffsetBehavior);
+                                               
+                                               LOG.warn("The new partition 
offsets are {}", partitionsToGetOffsetsFor);
+                                               continue; // jump back to 
create a new fetch request. The offset has not been touched.
+                                       }
+                                       else if (partitionsRemoved) {
+                                               continue; // create new fetch 
request
+                                       }
+                                       else {
+                                               // partitions failed on an error
+                                               throw new IOException("Error 
while fetching from broker '" + broker +"': " + exception);
+                                       }
+                               } else {
+                                       // successful fetch, reset 
offsetOutOfRangeCount.
+                                       offsetOutOfRangeCount = 0;
+                               }
+
+                               // ----------------------------------- process 
fetch response ----------------------------
+
+                               int messagesInFetch = 0;
+                               int deletedMessages = 0;
+                               
Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator = 
partitions.iterator();
+                               
+                               partitionsLoop:
+                               while (partitionsIterator.hasNext()) {
+                                       final 
KafkaTopicPartitionState<TopicAndPartition> currentPartition = 
partitionsIterator.next();
+                                       
+                                       final ByteBufferMessageSet messageSet = 
fetchResponse.messageSet(
+                                                       
currentPartition.getTopic(), currentPartition.getPartition());
+
+                                       for (MessageAndOffset msg : messageSet) 
{
+                                               if (running) {
+                                                       messagesInFetch++;
+                                                       final ByteBuffer 
payload = msg.message().payload();
+                                                       final long offset = 
msg.offset();
+                                                       
+                                                       if (offset <= 
currentPartition.getOffset()) {
+                                                               // we have seen 
this message already
+                                                               
LOG.info("Skipping message with offset " + msg.offset()
+                                                                               
+ " because we have seen messages until (including) "
+                                                                               
+ currentPartition.getOffset()
+                                                                               
+ " from topic/partition " + currentPartition.getTopic() + '/'
+                                                                               
+ currentPartition.getPartition() + " already");
+                                                               continue;
+                                                       }
+
+                                                       // If the message value 
is null, this represents a delete command for the message key.
+                                                       // Log this and pass it 
on to the client who might want to also receive delete messages.
+                                                       byte[] valueBytes;
+                                                       if (payload == null) {
+                                                               
deletedMessages++;
+                                                               valueBytes = 
null;
+                                                       } else {
+                                                               valueBytes = 
new byte[payload.remaining()];
+                                                               
payload.get(valueBytes);
+                                                       }
+
+                                                       // put key into byte 
array
+                                                       byte[] keyBytes = null;
+                                                       int keySize = 
msg.message().keySize();
+
+                                                       if (keySize >= 0) { // 
message().hasKey() is doing the same. We save one int deserialization
+                                                               ByteBuffer 
keyPayload = msg.message().key();
+                                                               keyBytes = new 
byte[keySize];
+                                                               
keyPayload.get(keyBytes);
+                                                       }
+
+                                                       final T value = 
deserializer.deserialize(keyBytes, valueBytes, 
+                                                                       
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+                                                       
+                                                       if 
(deserializer.isEndOfStream(value)) {
+                                                               // remove 
partition from subscribed partitions.
+                                                               
partitionsIterator.remove();
+                                                               continue 
partitionsLoop;
+                                                       }
+                                                       
+                                                       owner.emitRecord(value, 
currentPartition, offset);
+                                               }
+                                               else {
+                                                       // no longer running
+                                                       return;
+                                               }
+                                       }
+                               }
+                               LOG.debug("This fetch contained {} messages ({} 
deleted messages)", messagesInFetch, deletedMessages);
+                       } // end of fetch loop
+
+                       if (!newPartitionsQueue.close()) {
+                               throw new Exception("Bug: Cleanly leaving 
fetcher thread without having a closed queue.");
+                       }
+               }
+               catch (Throwable t) {
+                       // report to the fetcher's error handler
+                       errorHandler.reportError(t);
+               }
+               finally {
+                       if (consumer != null) {
+                               // closing the consumer should not fail the 
program
+                               try {
+                                       consumer.close();
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Error while closing the 
Kafka simple consumer", t);
+                               }
+                       }
+               }
+       }
+
+       private void getMissingOffsetsFromKafka(
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions) throws IOException
+       {
+               // collect which partitions we should fetch offsets for
+               List<KafkaTopicPartitionState<TopicAndPartition>> 
partitionsToGetOffsetsFor = new ArrayList<>();
+               for (KafkaTopicPartitionState<TopicAndPartition> part : 
partitions) {
+                       if (!part.isOffsetDefined()) {
+                               // retrieve the offset from the consumer
+                               partitionsToGetOffsetsFor.add(part);
+                       }
+               }
+               
+               if (partitionsToGetOffsetsFor.size() > 0) {
+                       getLastOffsetFromKafka(consumer, 
partitionsToGetOffsetsFor, invalidOffsetBehavior);
+                       
+                       LOG.info("No checkpoint/savepoint offsets found for 
some partitions. " +
+                                       "Fetched the following start offsets 
{}", partitionsToGetOffsetsFor);
+               }
+       }
+
+       /**
+        * Cancels this fetch thread. The thread will release all resources and 
terminate.
+        */
+       public void cancel() {
+               this.running = false;
+
+               // interrupt whatever the consumer is doing
+               if (consumer != null) {
+                       consumer.close();
+               }
+
+               this.interrupt();
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Kafka Request Utils
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Request latest offsets for a set of partitions, via a Kafka consumer.
+        *
+        * <p>This method retries three times if the response has an error.
+        *
+        * @param consumer The consumer connected to lead broker
+        * @param partitions The list of partitions we need offsets for
+        * @param whichTime The type of time we are requesting. -1 and -2 are 
special constants (See OffsetRequest)
+        */
+       private static void getLastOffsetFromKafka(
+                       SimpleConsumer consumer,
+                       List<KafkaTopicPartitionState<TopicAndPartition>> 
partitions,
+                       long whichTime) throws IOException
+       {
+               Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo 
= new HashMap<>();
+               for (KafkaTopicPartitionState<TopicAndPartition> part : 
partitions) {
+                       requestInfo.put(part.getKafkaPartitionHandle(), new 
PartitionOffsetRequestInfo(whichTime, 1));
+               }
+
+               int retries = 0;
+               OffsetResponse response;
+               while (true) {
+                       kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(
+                                       requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+                       response = consumer.getOffsetsBefore(request);
+
+                       if (response.hasError()) {
+                               StringBuilder exception = new StringBuilder();
+                               for 
(KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
+                                       short code;
+                                       if ((code = 
response.errorCode(part.getTopic(), part.getPartition())) != 
ErrorMapping.NoError()) {
+                                               exception.append("\nException 
for topic=").append(part.getTopic())
+                                                               .append(" 
partition=").append(part.getPartition()).append(": ")
+                                                               
.append(StringUtils.stringifyException(ErrorMapping.exceptionFor(code)));
+                                       }
+                               }
+                               if (++retries >= 3) {
+                                       throw new IOException("Unable to get 
last offset for partitions " + partitions + ": "
+                                                       + exception.toString());
+                               } else {
+                                       LOG.warn("Unable to get last offset for 
partitions: Exception(s): {}", exception);
+                               }
+                       } else {
+                               break; // leave retry loop
+                       }
+               }
+
+               for (KafkaTopicPartitionState<TopicAndPartition> part: 
partitions) {
+                       final long offset = response.offsets(part.getTopic(), 
part.getPartition())[0];
+                       
+                       // the offset returned is that of the next record to 
fetch. because our state reflects the latest
+                       // successfully emitted record, we subtract one
+                       part.setOffset(offset - 1);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index 328cab0..a1a81ed 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -24,7 +24,6 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.slf4j.Logger;
@@ -39,11 +38,9 @@ import java.util.Properties;
 /**
  * Handler for committing Kafka offsets to Zookeeper and to retrieve them 
again.
  */
-public class ZookeeperOffsetHandler implements OffsetHandler {
+public class ZookeeperOffsetHandler {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-       
-       private static final long OFFSET_NOT_SET = 
FlinkKafkaConsumerBase.OFFSET_NOT_SET;
 
        private final String groupId;
 
@@ -74,27 +71,40 @@ public class ZookeeperOffsetHandler implements 
OffsetHandler {
                curatorClient = CuratorFrameworkFactory.newClient(zkConnect, 
sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
                curatorClient.start();
        }
+       
+       // 
------------------------------------------------------------------------
+       //  Offset access and manipulation
+       // 
------------------------------------------------------------------------
 
-
-       @Override
-       public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) 
throws Exception {
-               for (Map.Entry<KafkaTopicPartition, Long> entry : 
offsetsToCommit.entrySet()) {
+       /**
+        * Writes given set of offsets for Kafka partitions to ZooKeeper.
+        * 
+        * @param offsetsToWrite The offsets for the partitions to write.
+        * @throws Exception The method forwards exceptions.
+        */
+       public void writeOffsets(Map<KafkaTopicPartition, Long> offsetsToWrite) 
throws Exception {
+               for (Map.Entry<KafkaTopicPartition, Long> entry : 
offsetsToWrite.entrySet()) {
                        KafkaTopicPartition tp = entry.getKey();
                        long offset = entry.getValue();
-                       
+
                        if (offset >= 0) {
                                setOffsetInZooKeeper(curatorClient, groupId, 
tp.getTopic(), tp.getPartition(), offset);
                        }
                }
        }
 
-       @Override
+       /**
+        * 
+        * @param partitions The partitions to read offsets for.
+        * @return The mapping from partition to offset.
+        * @throws Exception This method forwards exceptions.
+        */
        public Map<KafkaTopicPartition, Long> 
getOffsets(List<KafkaTopicPartition> partitions) throws Exception {
                Map<KafkaTopicPartition, Long> ret = new 
HashMap<>(partitions.size());
                for (KafkaTopicPartition tp : partitions) {
-                       long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopic(), tp.getPartition());
+                       Long offset = getOffsetFromZooKeeper(curatorClient, 
groupId, tp.getTopic(), tp.getPartition());
 
-                       if (offset != OFFSET_NOT_SET) {
+                       if (offset != null) {
                                LOG.info("Offset for TopicPartition {}:{} was 
set to {} in ZooKeeper. Seeking fetcher to that position.",
                                                tp.getTopic(), 
tp.getPartition(), offset);
                                ret.put(tp, offset);
@@ -103,7 +113,11 @@ public class ZookeeperOffsetHandler implements 
OffsetHandler {
                return ret;
        }
 
-       @Override
+       /**
+        * Closes the offset handler.
+        * 
+        * @throws IOException Thrown, if the handler cannot be closed properly.
+        */
        public void close() throws IOException {
                curatorClient.close();
        }
@@ -120,7 +134,7 @@ public class ZookeeperOffsetHandler implements 
OffsetHandler {
                curatorClient.setData().forPath(path, data);
        }
 
-       public static long getOffsetFromZooKeeper(CuratorFramework 
curatorClient, String groupId, String topic, int partition) throws Exception {
+       public static Long getOffsetFromZooKeeper(CuratorFramework 
curatorClient, String groupId, String topic, int partition) throws Exception {
                ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, 
topic);
                String path = topicDirs.consumerOffsetDir() + "/" + partition;
                
curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
@@ -128,18 +142,20 @@ public class ZookeeperOffsetHandler implements 
OffsetHandler {
                byte[] data = curatorClient.getData().forPath(path);
                
                if (data == null) {
-                       return OFFSET_NOT_SET;
+                       return null;
                } else {
                        String asString = new String(data);
                        if (asString.length() == 0) {
-                               return OFFSET_NOT_SET;
+                               return null;
                        } else {
                                try {
-                                       return Long.parseLong(asString);
-                               } catch (NumberFormatException e) {
-                                       throw new Exception(String.format(
-                                               "The offset in ZooKeeper for 
group '%s', topic '%s', partition %d is a malformed string: %s",
-                                               groupId, topic, partition, 
asString));
+                                       return Long.valueOf(asString);
+                               }
+                               catch (NumberFormatException e) {
+                                       LOG.error(
+                                                       "The offset in 
ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}",
+                                               groupId, topic, partition, 
asString);
+                                       return null;
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index d6ee968..0aef3bd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,8 +34,11 @@ import org.junit.Test;
 
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class Kafka08ITCase extends KafkaConsumerTestBase {
 
@@ -45,11 +47,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
        // 
------------------------------------------------------------------------
 
        @Test(timeout = 60000)
-       public void testCheckpointing() throws Exception {
-               runCheckpointingTest();
-       }
-
-       @Test(timeout = 60000)
        public void testFailOnNoBroker() throws Exception {
                runFailOnNoBrokerTest();
        }
@@ -60,15 +57,15 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                runSimpleConcurrentProducerConsumerTopology();
        }
 
-       @Test(timeout = 60000)
-       public void testPunctuatedExplicitWMConsumer() throws Exception {
-               runExplicitPunctuatedWMgeneratingConsumerTest(false);
-       }
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumer() throws Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(false);
+//     }
 
-       @Test(timeout = 60000)
-       public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
-               runExplicitPunctuatedWMgeneratingConsumerTest(true);
-       }
+//     @Test(timeout = 60000)
+//     public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws 
Exception {
+//             runExplicitPunctuatedWMgeneratingConsumerTest(true);
+//     }
 
        @Test(timeout = 60000)
        public void testKeyValueSupport() throws Exception {
@@ -164,7 +161,31 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                runMetricsAndEndOfStreamTest();
        }
 
+       @Test
+       public void runOffsetManipulationInZooKeeperTest() {
+               try {
+                       final String topicName = 
"ZookeeperOffsetHandlerTest-Topic";
+                       final String groupId = 
"ZookeeperOffsetHandlerTest-Group";
 
+                       final Long offset = (long) (Math.random() * 
Long.MAX_VALUE);
+
+                       CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
+                       kafkaServer.createTestTopic(topicName, 3, 2);
+
+                       
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, 
topicName, 0, offset);
+
+                       Long fetchedOffset = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, 
topicName, 0);
+
+                       curatorFramework.close();
+
+                       assertEquals(offset, fetchedOffset);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
        /**
         * Tests that offsets are properly committed to ZooKeeper and initial 
offsets are read from ZooKeeper.
         *
@@ -202,15 +223,15 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
                CuratorFramework curatorClient = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 2);
+               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 0);
+               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 1);
+               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorClient, 
standardProps.getProperty("group.id"), topicName, 2);
 
                LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 
-               assertTrue(o1 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o1 
>= 0 && o1 <= 100));
-               assertTrue(o2 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o2 
>= 0 && o2 <= 100));
-               assertTrue(o3 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o3 
>= 0 && o3 <= 100));
+               assertTrue(o1 == null || (o1 >= 0 && o1 <= 100));
+               assertTrue(o2 == null || (o2 >= 0 && o2 <= 100));
+               assertTrue(o3 == null || (o3 >= 0 && o3 <= 100));
 
                LOG.info("Manipulating offsets");
 
@@ -264,16 +285,16 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                // get the offset
                CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
 
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
+               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
+               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
+               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
                curatorFramework.close();
                LOG.info("Got final offsets from zookeeper o1={}, o2={}, 
o3={}", o1, o2, o3);
 
                // ensure that the offset has been committed
-               boolean atLeastOneOffsetSet = (o1 > 0 && o1 <= 100) ||
-                               (o2 > 0 && o2 <= 100) ||
-                               (o3 > 0 && o3 <= 100);
+               boolean atLeastOneOffsetSet = (o1 != null && o1 > 0 && o1 <= 
100) ||
+                               (o2 != null && o2 > 0 && o2 <= 100) ||
+                               (o3 != null && o3 > 0 && o3 <= 100);
                assertTrue("Expecting at least one offset to be set o1="+o1+" 
o2="+o2+" o3="+o3, atLeastOneOffsetSet);
 
                deleteTestTopic(topicName);
@@ -295,7 +316,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
        public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
                final String topicName = "testKafkaOffsetToZk";
                final int parallelism = 3;
+               
                createTestTopic(topicName, parallelism, 1);
+               
                StreamExecutionEnvironment env1 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env1.getConfig().disableSysoutLogging();
                
env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
@@ -305,9 +328,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                writeSequence(env1, topicName, 50, parallelism);
 
 
-               StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
-               // enable checkpointing
+               final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env2.getConfig().disableSysoutLogging();
                
env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
                env2.setParallelism(parallelism);
@@ -320,78 +341,55 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                DataStream<String> stream = 
env2.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), 
readProps));
                stream.addSink(new DiscardingSink<String>());
 
-               CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
-               String consumerGroupDir = standardProps.getProperty("group.id");
-               TreeCache tc1 = new TreeCache(curatorFramework, "/consumers/" + 
consumerGroupDir + "/offsets/" + topicName + "/0");
-               TreeCache tc2 = new TreeCache(curatorFramework, "/consumers/" + 
consumerGroupDir + "/offsets/" + topicName + "/1");
-               TreeCache tc3 = new TreeCache(curatorFramework, "/consumers/" + 
consumerGroupDir + "/offsets/" + topicName + "/2");
-
-               // add listener to wait until first partition is updated in ZK
-               TreeCacheListener stopListener = new TreeCacheListener() {
-                       AtomicInteger counter = new AtomicInteger(0);
-                       @Override
-                       public void childEvent(CuratorFramework client, 
TreeCacheEvent event) throws Exception {
-                               LOG.info("Updated {}", event);
-                               if 
(event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-                                       if(counter.incrementAndGet() == 3) {
-                                               // cancel job, node has been 
created
-                                               LOG.info("Cancelling job after 
all three ZK nodes were updated");
-                                               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-                                       }
-                               }
-                       }
-               };
-               tc1.getListenable().addListener(stopListener);
-               tc1.start();
-               tc2.getListenable().addListener(stopListener);
-               tc2.start();
-               tc3.getListenable().addListener(stopListener);
-               tc3.start();
-
-               // the curator listener is not always working properly. Stop 
job after 10 seconds
-               final Tuple1<Throwable> error = new Tuple1<>();
-               Thread canceller = new Thread(new Runnable() {
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+               final Thread runner = new Thread("runner") {
                        @Override
                        public void run() {
                                try {
-                                       Thread.sleep(10_000L);
-                                       LOG.info("Cancelling job after 10 
seconds");
-                                       
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-                               } catch (Throwable t) {
-                                       if (!(t instanceof 
InterruptedException)) {
-                                               error.f0 = t;
+                                       env2.execute();
+                               }
+                               catch (Throwable t) {
+                                       if (!(t.getCause() instanceof 
JobCancellationException)) {
+                                               errorRef.set(t);
                                        }
                                }
                        }
-               });
-               canceller.start();
-
-               try {
-                       env2.execute("Idlying Kafka source");
-               } catch( Throwable thr) {
-                       if(!(thr.getCause() instanceof 
JobCancellationException)) {
-                               throw thr;
+               };
+               runner.start();
+
+               final CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+               final Long l49 = 49L;
+                               
+               final long deadline = 30000 + System.currentTimeMillis();
+               do {
+                       Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
+                       Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
+                       Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
+
+                       if (l49.equals(o1) && l49.equals(o2) && l49.equals(o3)) 
{
+                               break;
                        }
+                       
+                       Thread.sleep(100);
                }
-               tc1.close();
-               tc2.close();
-               tc3.close();
-
-               canceller.interrupt();
-               canceller.join();
-               if(error.f0 != null) {
-                       throw new RuntimeException("Delayed cancelling thread 
had an error", error.f0);
+               while (System.currentTimeMillis() < deadline);
+               
+               // cancel the job
+               
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+               
+               final Throwable t = errorRef.get();
+               if (t != null) {
+                       throw new RuntimeException("Job failed with an 
exception", t);
                }
 
                // check if offsets are correctly in ZK
-               long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
-               long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
-               long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
-               Assert.assertEquals(49L, o1);
-               Assert.assertEquals(49L, o2);
-               Assert.assertEquals(49L, o3);
+               Long o1 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 0);
+               Long o2 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 1);
+               Long o3 = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, 
standardProps.getProperty("group.id"), topicName, 2);
+               Assert.assertEquals(Long.valueOf(49L), o1);
+               Assert.assertEquals(Long.valueOf(49L), o2);
+               Assert.assertEquals(Long.valueOf(49L), o3);
 
                curatorFramework.close();
-
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
new file mode 100644
index 0000000..36fb7e6
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.junit.Assert.*;
+
+public class KafkaConsumer08Test {
+
+       @Test
+       public void testValidateZooKeeperConfig() {
+               try {
+                       // empty
+                       Properties emptyProperties = new Properties();
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       // no connect string (only group string)
+                       Properties noConnect = new Properties();
+                       noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-test-group");
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       // no group string (only connect string)
+                       Properties noGroup = new Properties();
+                       noGroup.put("zookeeper.connect", "localhost:47574");
+                       try {
+                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
+                               fail("should fail with an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCreateSourceWithoutCluster() {
+               try {
+                       Properties props = new Properties();
+                       props.setProperty("zookeeper.connect", 
"localhost:56794");
+                       props.setProperty("bootstrap.servers", 
"localhost:11111, localhost:22222");
+                       props.setProperty("group.id", "non-existent-group");
+
+                       new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new 
SimpleStringSchema(), props);
+                       fail();
+               }
+               catch (Exception e) {
+                       assertTrue(e.getMessage().contains("Unable to retrieve 
any partitions"));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
deleted file mode 100644
index 7337f65..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
-       @Test
-       public void testValidateZooKeeperConfig() {
-               try {
-                       // empty
-                       Properties emptyProperties = new Properties();
-                       try {
-                               
FlinkKafkaConsumer08.validateZooKeeperConfig(emptyProperties);
-                               fail("should fail with an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       // no connect string (only group string)
-                       Properties noConnect = new Properties();
-                       noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-test-group");
-                       try {
-                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noConnect);
-                               fail("should fail with an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       // no group string (only connect string)
-                       Properties noGroup = new Properties();
-                       noGroup.put("zookeeper.connect", "localhost:47574");
-                       try {
-                               
FlinkKafkaConsumer08.validateZooKeeperConfig(noGroup);
-                               fail("should fail with an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSnapshot() {
-               try {
-                       Field offsetsField = 
FlinkKafkaConsumerBase.class.getDeclaredField("partitionState");
-                       Field runningField = 
FlinkKafkaConsumerBase.class.getDeclaredField("running");
-                       Field mapField = 
FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
-
-                       offsetsField.setAccessible(true);
-                       runningField.setAccessible(true);
-                       mapField.setAccessible(true);
-
-                       FlinkKafkaConsumer08<?> consumer = 
mock(FlinkKafkaConsumer08.class);
-                       when(consumer.snapshotState(anyLong(), 
anyLong())).thenCallRealMethod();
-
-                       HashMap<KafkaTopicPartition, KafkaPartitionState> 
testState = new HashMap<>();
-                       HashMap<KafkaTopicPartition, Long> testOffsets = new 
HashMap<>();
-                       long[] offsets = new long[] { 43, 6146, 133, 16, 162, 
616 };
-                       int j = 0;
-                       for (long i: offsets) {
-                               KafkaTopicPartition ktp = new 
KafkaTopicPartition("topic", j++);
-                               testState.put(ktp, new 
KafkaPartitionState(ktp.getPartition(), i));
-                               testOffsets.put(ktp, i);
-                       }
-
-                       LinkedMap map = new LinkedMap();
-
-                       offsetsField.set(consumer, testState);
-                       runningField.set(consumer, true);
-                       mapField.set(consumer, map);
-
-                       assertTrue(map.isEmpty());
-
-                       // make multiple checkpoints
-                       for (long checkpointId = 10L; checkpointId <= 2000L; 
checkpointId += 9L) {
-                               HashMap<KafkaTopicPartition, Long> checkpoint = 
consumer.snapshotState(checkpointId, 47 * checkpointId);
-                               assertEquals(testOffsets, checkpoint);
-
-                               // change the offsets, make sure the snapshot 
did not change
-                               HashMap<KafkaTopicPartition, Long> 
checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone();
-
-                               for (Map.Entry<KafkaTopicPartition, Long> e: 
testOffsets.entrySet()) {
-                                       KafkaTopicPartition ktp = e.getKey();
-                                       testState.put(ktp, new 
KafkaPartitionState(ktp.getPartition(), e.getValue() + 1));
-                                       testOffsets.put(ktp, e.getValue() + 1);
-                               }
-
-                               assertEquals(checkpointCopy, checkpoint);
-
-                               assertTrue(map.size() > 0);
-                               assertTrue(map.size() <= 
FlinkKafkaConsumer08.MAX_NUM_PENDING_CHECKPOINTS);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       @Ignore("Kafka consumer internally makes an infinite loop")
-       public void testCreateSourceWithoutCluster() {
-               try {
-                       Properties props = new Properties();
-                       props.setProperty("zookeeper.connect", 
"localhost:56794");
-                       props.setProperty("bootstrap.servers", 
"localhost:11111, localhost:22222");
-                       props.setProperty("group.id", "non-existent-group");
-
-                       new 
FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), new 
SimpleStringSchema(), props);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
index 21140dd..c28799c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention08ITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.junit.Test;
 
+@SuppressWarnings("serial")
 public class KafkaShortRetention08ITCase extends KafkaShortRetentionTestBase {
 
        @Test(timeout=60000)
@@ -28,6 +29,6 @@ public class KafkaShortRetention08ITCase extends 
KafkaShortRetentionTestBase {
 
        @Test(timeout=60000)
        public void testAutoOffsetResetNone() throws Exception {
-               runFailOnAutoOffsetResetNone();
+               runFailOnAutoOffsetResetNoneEager();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index c99e133..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-       
-       @Test
-       public void runOffsetManipulationinZooKeeperTest() {
-               try {
-                       final String topicName = 
"ZookeeperOffsetHandlerTest-Topic";
-                       final String groupId = 
"ZookeeperOffsetHandlerTest-Group";
-                       
-                       final long offset = (long) (Math.random() * 
Long.MAX_VALUE);
-
-                       CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer ).createCuratorClient();
-                       kafkaServer.createTestTopic(topicName, 3, 2);
-
-                       
ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, 
topicName, 0, offset);
-       
-                       long fetchedOffset = 
ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, 
topicName, 0);
-
-                       curatorFramework.close();
-                       
-                       assertEquals(offset, fetchedOffset);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
index 6bdfb48..fbeb110 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/resources/log4j-test.properties
@@ -25,5 +25,6 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] 
%-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 55f9875..d34cd2f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -17,38 +17,31 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.util.SerializedValue;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel 
data stream from
@@ -73,10 +66,8 @@ import static java.util.Objects.requireNonNull;
  */
 public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 
-       // 
------------------------------------------------------------------------
-       
        private static final long serialVersionUID = 2324564345203409112L;
-       
+
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
 
        /**  Configuration key to change the polling timeout **/
@@ -85,35 +76,18 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
        /** Boolean configuration key to disable metrics tracking **/
        public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
 
-       /**
-        * From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
-        * available. If 0, returns immediately with any records that are 
available now.
-        */
+       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
+        * available. If 0, returns immediately with any records that are 
available now. */
        public static final long DEFAULT_POLL_TIMEOUT = 100L;
 
+       // 
------------------------------------------------------------------------
+
        /** User-supplied properties for Kafka **/
        private final Properties properties;
-       /** Ordered list of all partitions available in all subscribed 
partitions **/
-       private final List<KafkaTopicPartition> partitionInfos;
-
-       /** Unique ID identifying the consumer */
-       private final String consumerId;
-
-       // ------  Runtime State  -------
-
-       /** The partitions actually handled by this consumer at runtime */
-       private transient List<TopicPartition> subscribedPartitions;
-       /** For performance reasons, we are keeping two representations of the 
subscribed partitions **/
-       private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink;
-       /** The Kafka Consumer instance**/
-       private transient KafkaConsumer<byte[], byte[]> consumer;
-       /** The thread running Kafka's consumer **/
-       private transient ConsumerThread<T> consumerThread;
-       /** Exception set from the ConsumerThread */
-       private transient Throwable consumerThreadException;
-       /** If the consumer doesn't have a Kafka partition assigned at runtime, 
it'll block on this waitThread **/
-       private transient Thread waitThread;
 
+       /** From Kafka's Javadoc: The time, in milliseconds, spent waiting in 
poll if data is not
+        * available. If 0, returns immediately with any records that are 
available now */
+       private final long pollTimeout;
 
        // 
------------------------------------------------------------------------
 
@@ -177,14 +151,30 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
         *           The properties that are used to configure both the fetcher 
and the offset handler.
         */
        public FlinkKafkaConsumer09(List<String> topics, 
KeyedDeserializationSchema<T> deserializer, Properties props) {
-               super(deserializer, props);
-               requireNonNull(topics, "topics");
-               this.properties = requireNonNull(props, "props");
+               super(deserializer);
+
+               checkNotNull(topics, "topics");
+               this.properties = checkNotNull(props, "props");
                setDeserializer(this.properties);
+
+               // configure the polling timeout
+               try {
+                       if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+                               this.pollTimeout = 
Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+                       } else {
+                               this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+                       }
+               }
+               catch (Exception e) {
+                       throw new IllegalArgumentException("Cannot parse poll 
timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+               }
+
+               // read the partitions that belong to the listed topics
+               final List<KafkaTopicPartition> partitions = new ArrayList<>();
                KafkaConsumer<byte[], byte[]> consumer = null;
+
                try {
                        consumer = new KafkaConsumer<>(this.properties);
-                       this.partitionInfos = new ArrayList<>();
                        for (final String topic: topics) {
                                // get partitions for each topic
                                List<PartitionInfo> partitionsForTopic = null;
@@ -203,307 +193,93 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
                                        consumer.close();
                                        try {
                                                Thread.sleep(1000);
-                                       } catch (InterruptedException e) {
-                                       }
+                                       } catch (InterruptedException ignored) 
{}
+                                       
                                        consumer = new 
KafkaConsumer<>(properties);
                                }
                                // for non existing topics, the list might be 
null.
-                               if(partitionsForTopic != null) {
-                                       
partitionInfos.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
+                               if (partitionsForTopic != null) {
+                                       
partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
                                }
                        }
-               } finally {
+               }
+               finally {
                        if(consumer != null) {
                                consumer.close();
                        }
                }
-               if(partitionInfos.isEmpty()) {
+
+               if (partitions.isEmpty()) {
                        throw new RuntimeException("Unable to retrieve any 
partitions for the requested topics " + topics);
                }
 
                // we now have a list of partitions which is the same for all 
parallel consumer instances.
-               LOG.info("Got {} partitions from these topics: {}", 
partitionInfos.size(), topics);
+               LOG.info("Got {} partitions from these topics: {}", 
partitions.size(), topics);
 
                if (LOG.isInfoEnabled()) {
-                       logPartitionInfo(partitionInfos);
+                       logPartitionInfo(LOG, partitions);
                }
 
-               this.consumerId = UUID.randomUUID().toString();
+               // register these partitions
+               setSubscribedPartitions(partitions);
+       }
+
+       @Override
+       protected AbstractFetcher<T, ?> createFetcher(
+                       SourceContext<T> sourceContext,
+                       List<KafkaTopicPartition> thisSubtaskPartitions,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       StreamingRuntimeContext runtimeContext) throws 
Exception {
+
+               boolean useMetrics = 
!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
+
+               return new Kafka09Fetcher<>(sourceContext, 
thisSubtaskPartitions,
+                               watermarksPeriodic, watermarksPunctuated,
+                               runtimeContext, deserializer,
+                               properties, pollTimeout, useMetrics);
+               
        }
 
+       // 
------------------------------------------------------------------------
+       //  Utilities 
+       // 
------------------------------------------------------------------------
 
        /**
         * Converts a list of Kafka PartitionInfo's to Flink's 
KafkaTopicPartition (which are serializable)
+        * 
         * @param partitions A list of Kafka PartitionInfos.
         * @return A list of KafkaTopicPartitions
         */
-       public static List<KafkaTopicPartition> 
convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
-               requireNonNull(partitions, "The given list of partitions was 
null");
+       private static List<KafkaTopicPartition> 
convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
+               checkNotNull(partitions);
+
                List<KafkaTopicPartition> ret = new 
ArrayList<>(partitions.size());
-               for(PartitionInfo pi: partitions) {
+               for (PartitionInfo pi : partitions) {
                        ret.add(new KafkaTopicPartition(pi.topic(), 
pi.partition()));
                }
                return ret;
        }
 
-       public static List<TopicPartition> 
convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) {
-               List<TopicPartition> ret = new ArrayList<>(partitions.size());
-               for(KafkaTopicPartition ktp: partitions) {
-                       ret.add(new TopicPartition(ktp.getTopic(), 
ktp.getPartition()));
-               }
-               return ret;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Source life cycle
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-
-               final int numConsumers = 
getRuntimeContext().getNumberOfParallelSubtasks();
-               final int thisConsumerIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-
-               // pick which partitions we work on
-               this.subscribedPartitionsAsFlink = 
assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex);
-               if(this.subscribedPartitionsAsFlink.isEmpty()) {
-                       LOG.info("This consumer doesn't have any partitions 
assigned");
-                       this.partitionState = null;
-                       return;
-               } else {
-                       StreamingRuntimeContext streamingRuntimeContext = 
(StreamingRuntimeContext) getRuntimeContext();
-                       // if checkpointing is enabled, we are not 
automatically committing to Kafka.
-                       
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled()));
-                       this.consumer = new KafkaConsumer<>(properties);
-               }
-               subscribedPartitions = 
convertToKafkaTopicPartition(subscribedPartitionsAsFlink);
-
-               this.consumer.assign(this.subscribedPartitions);
-
-               // register Kafka metrics to Flink accumulators
-               if(!Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
-                       Map<MetricName, ? extends Metric> metrics = 
this.consumer.metrics();
-                       if(metrics == null) {
-                               // MapR's Kafka implementation returns null 
here.
-                               LOG.info("Consumer implementation does not 
support metrics");
-                       } else {
-                               for (Map.Entry<MetricName, ? extends Metric> 
metric : metrics.entrySet()) {
-                                       String name = consumerId + "-consumer-" 
+ metric.getKey().name();
-                                       DefaultKafkaMetricAccumulator 
kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
-                                       // best effort: we only add the 
accumulator if available.
-                                       if (kafkaAccumulator != null) {
-                                               
getRuntimeContext().addAccumulator(name, kafkaAccumulator);
-                                       }
-                               }
-                       }
-               }
-
-               // check if we need to explicitly seek to a specific offset 
(restore case)
-               if(restoreToOffset != null) {
-                       // we are in a recovery scenario
-                       for(Map.Entry<KafkaTopicPartition, Long> info: 
restoreToOffset.entrySet()) {
-                               // seek all offsets to the right position
-                               this.consumer.seek(new 
TopicPartition(info.getKey().getTopic(), info.getKey().getPartition()), 
info.getValue() + 1);
-                       }
-                       this.partitionState = restoreInfoFromCheckpoint();
-               } else {
-                       this.partitionState = new HashMap<>();
-               }
-       }
-
-
-
-       @Override
-       public void run(SourceContext<T> sourceContext) throws Exception {
-               if(consumer != null) {
-                       consumerThread = new ConsumerThread<>(this, 
sourceContext);
-                       consumerThread.setDaemon(true);
-                       consumerThread.start();
-                       // wait for the consumer to stop
-                       while(consumerThread.isAlive()) {
-                               if(consumerThreadException != null) {
-                                       throw new 
RuntimeException("ConsumerThread threw an exception: " + 
consumerThreadException.getMessage(), consumerThreadException);
-                               }
-                               try {
-                                       consumerThread.join(50);
-                               } catch (InterruptedException ie) {
-                                       consumerThread.shutdown();
-                               }
-                       }
-                       // check again for an exception
-                       if(consumerThreadException != null) {
-                               throw new RuntimeException("ConsumerThread 
threw an exception: " + consumerThreadException.getMessage(), 
consumerThreadException);
-                       }
-               } else {
-                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
-                       // to not block watermark forwarding
-                       sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-
-                       final Object waitLock = new Object();
-                       this.waitThread = Thread.currentThread();
-                       while (running) {
-                               // wait until we are canceled
-                               try {
-                                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                                       synchronized (waitLock) {
-                                               waitLock.wait();
-                                       }
-                               }
-                               catch (InterruptedException e) {
-                                       // do nothing, check our "running" 
status
-                               }
-                       }
-               }
-               // close the context after the work was done. this can actually 
only
-               // happen when the fetcher decides to stop fetching
-               sourceContext.close();
-       }
-
-       @Override
-       public void cancel() {
-               // set ourselves as not running
-               running = false;
-               if(this.consumerThread != null) {
-                       this.consumerThread.shutdown();
-               } else {
-                       // the consumer thread is not running, so we have to 
interrupt our own thread
-                       if(waitThread != null) {
-                               waitThread.interrupt();
-                       }
-               }
-       }
-
-       @Override
-       public void close() throws Exception {
-               cancel();
-               super.close();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Checkpoint and restore
-       // 
------------------------------------------------------------------------
-
-
-       @Override
-       protected void commitOffsets(HashMap<KafkaTopicPartition, Long> 
checkpointOffsets) {
-               if(!running) {
-                       LOG.warn("Unable to commit offsets on closed consumer");
-                       return;
-               }
-               Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = 
convertToCommitMap(checkpointOffsets);
-               synchronized (this.consumer) {
-                       this.consumer.commitSync(kafkaCheckpointOffsets);
-               }
-       }
-
-       public static Map<TopicPartition, OffsetAndMetadata> 
convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
-               Map<TopicPartition, OffsetAndMetadata> ret = new 
HashMap<>(checkpointOffsets.size());
-               for(Map.Entry<KafkaTopicPartition, Long> partitionOffset: 
checkpointOffsets.entrySet()) {
-                       ret.put(new 
TopicPartition(partitionOffset.getKey().getTopic(), 
partitionOffset.getKey().getPartition()),
-                                       new 
OffsetAndMetadata(partitionOffset.getValue(), ""));
-               }
-               return ret;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Miscellaneous utilities 
-       // 
------------------------------------------------------------------------
-
-
-       protected static void setDeserializer(Properties props) {
-               if 
(!props.contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-                       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getCanonicalName());
-               } else {
-                       LOG.warn("Overwriting the '{}' is not recommended", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-               }
-
-               if 
(!props.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-                       
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getCanonicalName());
-               } else {
-                       LOG.warn("Overwriting the '{}' is not recommended", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-               }
-       }
-
        /**
-        * We use a separate thread for executing the 
KafkaConsumer.poll(timeout) call because Kafka is not
-        * handling interrupts properly. On an interrupt (which happens 
automatically by Flink if the task
-        * doesn't react to cancel() calls), the poll() method might never 
return.
-        * On cancel, we'll wakeup the .poll() call and wait for it to return
+        * Makes sure that the ByteArrayDeserializer is registered in the Kafka 
properties.
+        * 
+        * @param props The Kafka properties to register the serializer in.
         */
-       private static class ConsumerThread<T> extends Thread {
-               private final FlinkKafkaConsumer09<T> flinkKafkaConsumer;
-               private final SourceContext<T> sourceContext;
-               private boolean running = true;
-
-               public ConsumerThread(FlinkKafkaConsumer09<T> 
flinkKafkaConsumer, SourceContext<T> sourceContext) {
-                       this.flinkKafkaConsumer = flinkKafkaConsumer;
-                       this.sourceContext = sourceContext;
-               }
+       private static void setDeserializer(Properties props) {
+               final String deSerName = 
ByteArrayDeserializer.class.getCanonicalName();
 
-               @Override
-               public void run() {
-                       try {
-                               long pollTimeout = 
Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, 
Long.toString(DEFAULT_POLL_TIMEOUT)));
-                               pollLoop: while (running) {
-                                       ConsumerRecords<byte[], byte[]> records;
-                                       //noinspection 
SynchronizeOnNonFinalField
-                                       synchronized 
(flinkKafkaConsumer.consumer) {
-                                               try {
-                                                       records = 
flinkKafkaConsumer.consumer.poll(pollTimeout);
-                                               } catch (WakeupException we) {
-                                                       if (running) {
-                                                               throw we;
-                                                       }
-                                                       // leave loop
-                                                       continue;
-                                               }
-                                       }
-                                       // get the records for each topic 
partition
-                                       for (int i = 0; i < 
flinkKafkaConsumer.subscribedPartitions.size(); i++) {
-                                               TopicPartition partition = 
flinkKafkaConsumer.subscribedPartitions.get(i);
-                                               KafkaTopicPartition 
flinkPartition = flinkKafkaConsumer.subscribedPartitionsAsFlink.get(i);
-                                               List<ConsumerRecord<byte[], 
byte[]>> partitionRecords = records.records(partition);
-                                               //noinspection 
ForLoopReplaceableByForEach
-                                               for (int j = 0; j < 
partitionRecords.size(); j++) {
-                                                       ConsumerRecord<byte[], 
byte[]> record = partitionRecords.get(j);
-                                                       T value = 
flinkKafkaConsumer.deserializer.deserialize(record.key(), record.value(), 
record.topic(), record.partition(),record.offset());
-                                                       
if(flinkKafkaConsumer.deserializer.isEndOfStream(value)) {
-                                                               // end of 
stream signaled
-                                                               running = false;
-                                                               break pollLoop;
-                                                       }
-                                                       synchronized 
(sourceContext.getCheckpointLock()) {
-                                                               
flinkKafkaConsumer.processElement(sourceContext, flinkPartition, value, 
record.offset());
-                                                       }
-                                               }
-                                       }
-                               }
-                       } catch(Throwable t) {
-                               if(running) {
-                                       
this.flinkKafkaConsumer.stopWithError(t);
-                               } else {
-                                       LOG.debug("Stopped ConsumerThread threw 
exception", t);
-                               }
-                       } finally {
-                               try {
-                                       flinkKafkaConsumer.consumer.close();
-                               } catch(Throwable t) {
-                                       LOG.warn("Error while closing 
consumer", t);
-                               }
-                       }
-               }
+               Object keyDeSer = 
props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+               Object valDeSer = 
props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 
-               /**
-                * Try to shutdown the thread
-                */
-               public void shutdown() {
-                       this.running = false;
-                       this.flinkKafkaConsumer.consumer.wakeup();
+               if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+                       LOG.warn("Ignoring configured key DeSerializer ({})", 
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+               }
+               if (valDeSer != null && !valDeSer.equals(deSerName)) {
+                       LOG.warn("Ignoring configured value DeSerializer ({})", 
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                }
-       }
 
-       private void stopWithError(Throwable t) {
-               this.consumerThreadException = t;
+               props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deSerName);
+               props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deSerName);
        }
 }

Reply via email to