[FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to
better handel shutdown/interrupt situations
Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate
thread that operates Kafka's consumer.
That thread ws shielded from interrupts, because the Kafka Consumer has not
been handling thread interrupts well.
Since that thread was also the thread that emitted records, it would block in
the network stack (backpressure) or in chained operators.
The later case lead to situations where cancellations got very slow unless that
thread would be interrupted (which it could not be).
This commit changes the thread model:
- A spawned consumer thread polls a batch or records from the KafkaConsumer
and pushes the
batch of records into a blocking queue (size one)
- The main thread of the task will pull the record batches from the blocking
queue and
emit the records.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a66e7ad1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a66e7ad1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a66e7ad1
Branch: refs/heads/master
Commit: a66e7ad14e41fa07737f447d68920ad5cc4ed6d3
Parents: fa1864c
Author: Stephan Ewen <[email protected]>
Authored: Thu Nov 10 11:13:43 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Nov 16 19:08:07 2016 +0100
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 20 +
.../kafka/internal/Kafka010Fetcher.java | 7 +-
.../internal/KafkaConsumerCallBridge010.java | 40 ++
.../connectors/kafka/Kafka010FetcherTest.java | 172 ++++++++-
.../kafka/KafkaShortRetention010ITCase.java | 34 --
.../connectors/kafka/internal/Handover.java | 214 ++++++++++
.../kafka/internal/Kafka09Fetcher.java | 274 +++----------
.../kafka/internal/KafkaConsumerCallBridge.java | 41 ++
.../kafka/internal/KafkaConsumerThread.java | 332 ++++++++++++++++
.../connectors/kafka/Kafka09FetcherTest.java | 164 +++++++-
.../kafka/KafkaShortRetention09ITCase.java | 34 --
.../connectors/kafka/internal/HandoverTest.java | 387 +++++++++++++++++++
.../kafka/KafkaShortRetentionTestBase.java | 1 +
13 files changed, 1422 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index cc7f56f..32bc1d2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -100,6 +100,26 @@ public final class ExceptionUtils {
}
/**
+ * Throws the given {@code Throwable} in scenarios where the signatures
do allow to
+ * throw a Exception. Errors and Exceptions are thrown directly, other
"exotic"
+ * subclasses of Throwable are wrapped in an Exception.
+ *
+ * @param t The throwable to be thrown.
+ * @param parentMessage The message for the parent Exception, if one is
needed.
+ */
+ public static void rethrowException(Throwable t, String parentMessage)
throws Exception {
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ else if (t instanceof Exception) {
+ throw (Exception) t;
+ }
+ else {
+ throw new Exception(parentMessage, t);
+ }
+ }
+
+ /**
* Tries to throw the given {@code Throwable} in scenarios where the
signatures allows only IOExceptions
* (and RuntimeException and Error). Throws this exception directly, if
it is an IOException,
* a RuntimeException, or an Error. Otherwise does nothing.
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 024cd38..71dd29a 100644
---
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -29,7 +29,6 @@ import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
@@ -91,11 +90,11 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
/**
* This method needs to be overridden because Kafka broke binary
compatibility between 0.9 and 0.10,
- * changing the List in the signature to a Collection.
+ * changing binary signatures
*/
@Override
- protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer,
List<TopicPartition> topicPartitions) {
- consumer.assign(topicPartitions);
+ protected KafkaConsumerCallBridge010 createCallBridge() {
+ return new KafkaConsumerCallBridge010();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
new file mode 100644
index 0000000..a81b098
--- /dev/null
+++
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls the {@link
KafkaConsumer#assign(java.util.Collection)} method.
+ *
+ * This indirection is necessary, because Kafka broke binary compatibility
between 0.9 and 0.10,
+ * changing {@code assign(List)} to {@code assign(Collection)}.
+ *
+ * Because of that, we need two versions whose compiled code goes against
different method signatures.
+ */
+public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
+
+ @Override
+ public void assignPartitions(KafkaConsumer<?, ?> consumer,
List<TopicPartition> topicPartitions) throws Exception {
+ consumer.assign(topicPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 037d25b..6ee0429 100644
---
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -20,16 +20,20 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -45,6 +49,7 @@ import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,6 +59,7 @@ import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -69,7 +75,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
* Unit tests for the {@link Kafka010Fetcher}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka010Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
public class Kafka010FetcherTest {
@Test
@@ -125,7 +131,7 @@ public class Kafka010FetcherTest {
getClass().getClassLoader(),
false, /* checkpointing */
"taskname-with-subtask",
- mock(MetricGroup.class),
+ new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
@@ -174,9 +180,13 @@ public class Kafka010FetcherTest {
fetcherRunner.join();
// check that there were no errors in the fetcher
- final Throwable caughtError = error.get();
- if (caughtError != null) {
- throw new Exception("Exception in the fetcher", caughtError);
+ final Throwable fetcherError = error.get();
+ if (fetcherError != null && !(fetcherError instanceof
Handover.ClosedException)) {
+ throw new Exception("Exception in the fetcher", fetcherError);
+ }
+ final Throwable committerError = commitError.get();
+ if (committerError != null) {
+ throw new Exception("Exception in the committer", committerError);
}
}
@@ -258,7 +268,7 @@ public class Kafka010FetcherTest {
getClass().getClassLoader(),
false, /* checkpointing */
"taskname-with-subtask",
- mock(MetricGroup.class),
+ new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
@@ -321,8 +331,154 @@ public class Kafka010FetcherTest {
// check that there were no errors in the fetcher
final Throwable caughtError = error.get();
- if (caughtError != null) {
+ if (caughtError != null && !(caughtError instanceof
Handover.ClosedException)) {
throw new Exception("Exception in the fetcher", caughtError);
}
}
+
+ @Test
+ public void testCancellationWhenEmitBlocks() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 15,
payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 16,
payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic, partition, 17,
payload, payload));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data =
new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new
ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new
Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) {
+ return consumerRecords;
+ }
+ });
+
+
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ BlockingSourceContext<String> sourceContext = new
BlockingSourceContext<>();
+ List<KafkaTopicPartition> topics = Collections.singletonList(new
KafkaTopicPartition(topic, partition));
+ KeyedDeserializationSchema<String> schema = new
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+ sourceContext,
+ topics,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the thread started to emit records to the source context
+ sourceContext.waitTillHasBlocker();
+
+ // now we try to cancel the fetcher, including the interruption
usually done on the task thread
+ // once it has finished, there must be no more thread blocked on the
source context
+ fetcher.cancel();
+ fetcherRunner.interrupt();
+ fetcherRunner.join();
+
+ assertFalse("fetcher threads did not properly finish",
sourceContext.isStillBlocking());
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+
+ private static final class BlockingSourceContext<T> implements
SourceContext<T> {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final OneShotLatch inBlocking = new OneShotLatch();
+
+ @Override
+ public void collect(T element) {
+ block();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ block();
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ block();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return new Object();
+ }
+
+ @Override
+ public void close() {}
+
+ public void waitTillHasBlocker() throws InterruptedException {
+ inBlocking.await();
+ }
+
+ public boolean isStillBlocking() {
+ return lock.isLocked();
+ }
+
+ @SuppressWarnings({"InfiniteLoopStatement",
"SynchronizationOnLocalVariableOrMethodParameter"})
+ private void block() {
+ lock.lock();
+ try {
+ inBlocking.trigger();
+
+ // put this thread to sleep indefinitely
+ final Object o = new Object();
+ while (true) {
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit cleanly, simply reset the interruption flag
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
deleted file mode 100644
index 1d36198..0000000
---
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention010ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention010ITCase extends KafkaShortRetentionTestBase {
-
- @Test(timeout=60000)
- public void testAutoOffsetReset() throws Exception {
- runAutoOffsetResetTest();
- }
-
- @Test(timeout=60000)
- public void testAutoOffsetResetNone() throws Exception {
- runFailOnAutoOffsetResetNone();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
new file mode 100644
index 0000000..e6e3c51
--- /dev/null
+++
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Handover is a utility to hand over data (a buffer of records) and
exception from a
+ * <i>producer</i> thread to a <i>consumer</i> thread. It effectively behaves
like a
+ * "size one blocking queue", with some extras around exception reporting,
closing, and
+ * waking up thread without {@link Thread#interrupt() interrupting} threads.
+ *
+ * <p>This class is used in the Flink Kafka Consumer to hand over data and
exceptions between
+ * the thread that runs the KafkaConsumer class and the main thread.
+ *
+ * <p>The Handover has the notion of "waking up" the producer thread with a
{@link WakeupException}
+ * rather than a thread interrupt.
+ *
+ * <p>The Handover can also be "closed", signalling from one thread to the
other that it
+ * the thread has terminated.
+ */
+@ThreadSafe
+public final class Handover implements Closeable {
+
+ private final Object lock = new Object();
+
+ private ConsumerRecords<byte[], byte[]> next;
+ private Throwable error;
+ private boolean wakeupProducer;
+
+ /**
+ * Polls the next element from the Handover, possibly blocking until
the next element is
+ * available. This method behaves similar to polling from a blocking
queue.
+ *
+ * <p>If an exception was handed in by the producer ({@link
#reportError(Throwable)}), then
+ * that exception is thrown rather than an element being returned.
+ *
+ * @return The next element (buffer of records, never null).
+ *
+ * @throws ClosedException Thrown if the Handover was {@link #close()
closed}.
+ * @throws Exception Rethrows exceptions from the {@link
#reportError(Throwable)} method.
+ */
+ @Nonnull
+ public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
+ synchronized (lock) {
+ while (next == null && error == null) {
+ lock.wait();
+ }
+
+ ConsumerRecords<byte[], byte[]> n = next;
+ if (n != null) {
+ next = null;
+ lock.notifyAll();
+ return n;
+ }
+ else {
+ ExceptionUtils.rethrowException(error,
error.getMessage());
+
+ // this statement cannot be reached since the
above method always throws an exception
+ // this is only here to silence the compiler
and any warnings
+ return ConsumerRecords.empty();
+ }
+ }
+ }
+
+ /**
+ * Hands over an element from the producer. If the Handover already has
an element that was
+ * not yet picked up by the consumer thread, this call blocks until the
consumer picks up that
+ * previous element.
+ *
+ * <p>This behavior is similar to a "size one" blocking queue.
+ *
+ * @param element The next element to hand over.
+ *
+ * @throws InterruptedException
+ * Thrown, if the thread is interrupted while blocking
for the Handover to be empty.
+ * @throws WakeupException
+ * Thrown, if the {@link #wakeupProducer()} method is
called while blocking for
+ * the Handover to be empty.
+ * @throws ClosedException
+ * Thrown if the Handover was closed or concurrently
being closed.
+ */
+ public void produce(final ConsumerRecords<byte[], byte[]> element)
+ throws InterruptedException, WakeupException,
ClosedException {
+
+ checkNotNull(element);
+
+ synchronized (lock) {
+ while (next != null && !wakeupProducer) {
+ lock.wait();
+ }
+
+ wakeupProducer = false;
+
+ // if there is still an element, we must have been
woken up
+ if (next != null) {
+ throw new WakeupException();
+ }
+ // if there is no error, then this is open and can
accept this element
+ else if (error == null) {
+ next = element;
+ lock.notifyAll();
+ }
+ // an error marks this as closed for the producer
+ else {
+ throw new ClosedException();
+ }
+ }
+ }
+
+ /**
+ * Reports an exception. The consumer will throw the given exception
immediately, if
+ * it is currently blocked in the {@link #pollNext()} method, or the
next time it
+ * calls that method.
+ *
+ * <p>After this method has been called, no call to either {@link
#produce(ConsumerRecords)}
+ * or {@link #pollNext()} will ever return regularly any more, but will
always return
+ * exceptionally.
+ *
+ * <p>If another exception was already reported, this method does
nothing.
+ *
+ * <p>For the producer, the Handover will appear as if it was {@link
#close() closed}.
+ *
+ * @param t The exception to report.
+ */
+ public void reportError(Throwable t) {
+ checkNotNull(t);
+
+ synchronized (lock) {
+ // do not override the initial exception
+ if (error == null) {
+ error = t;
+ }
+ next = null;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Closes the handover. Both the {@link #produce(ConsumerRecords)}
method and the
+ * {@link #pollNext()} will throw a {@link ClosedException} on any
currently blocking and
+ * future invocations.
+ *
+ * <p>If an exception was previously reported via the {@link
#reportError(Throwable)} method,
+ * that exception will not be overridden. The consumer thread will
throw that exception upon
+ * calling {@link #pollNext()}, rather than the {@code ClosedException}.
+ */
+ @Override
+ public void close() {
+ synchronized (lock) {
+ next = null;
+ wakeupProducer = false;
+
+ if (error == null) {
+ error = new ClosedException();
+ }
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Wakes the producer thread up. If the producer thread is currently
blocked in
+ * the {@link #produce(ConsumerRecords)} method, it will exit the
method throwing
+ * a {@link WakeupException}.
+ */
+ public void wakeupProducer() {
+ synchronized (lock) {
+ wakeupProducer = true;
+ lock.notifyAll();
+ }
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * An exception thrown by the Handover in the {@link #pollNext()} or
+ * {@link #produce(ConsumerRecords)} method, after the Handover was
closed via
+ * {@link #close()}.
+ */
+ public static final class ClosedException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * A special exception thrown bv the Handover in the {@link
#produce(ConsumerRecords)}
+ * method when the producer is woken up from a blocking call via {@link
#wakeupProducer()}.
+ */
+ public static final class WakeupException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index acdcb61..d495327 100644
---
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,10 +23,8 @@ import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
-import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
-import
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
@@ -34,30 +32,23 @@ import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer
API.
*
* @param <T> The type of elements produced by the fetcher.
*/
-public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition>
implements Runnable {
+public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
private static final Logger LOG =
LoggerFactory.getLogger(Kafka09Fetcher.class);
@@ -66,36 +57,15 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T,
TopicPartition> implem
/** The schema to convert between Kafka's byte messages, and Flink's
objects */
private final KeyedDeserializationSchema<T> deserializer;
- /** The configuration for the Kafka consumer */
- private final Properties kafkaProperties;
+ /** The handover of data and exceptions between the consumer thread and
the task thread */
+ private final Handover handover;
- /** The maximum number of milliseconds to wait for a fetch batch */
- private final long pollTimeout;
-
- /** The next offsets that the main thread should commit */
- private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>>
nextOffsetsToCommit;
-
- /** The callback invoked by Kafka once an offset commit is complete */
- private final OffsetCommitCallback offsetCommitCallback;
-
- /** Reference to the Kafka consumer, once it is created */
- private volatile KafkaConsumer<byte[], byte[]> consumer;
-
- /** Reference to the proxy, forwarding exceptions from the fetch thread
to the main thread */
- private volatile ExceptionProxy errorHandler;
+ /** The thread that runs the actual KafkaConsumer and hand the record
batches to this fetcher */
+ private final KafkaConsumerThread consumerThread;
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
- /** Flag tracking whether the latest commit request has completed */
- private volatile boolean commitInProgress;
-
- /** For Debug output **/
- private String taskNameWithSubtasks;
-
- /** We get this from the outside to publish metrics. **/
- private MetricGroup metricGroup;
-
//
------------------------------------------------------------------------
public Kafka09Fetcher(
@@ -125,16 +95,26 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T,
TopicPartition> implem
useMetrics);
this.deserializer = deserializer;
- this.kafkaProperties = kafkaProperties;
- this.pollTimeout = pollTimeout;
- this.nextOffsetsToCommit = new AtomicReference<>();
- this.offsetCommitCallback = new CommitCallback();
- this.taskNameWithSubtasks = taskNameWithSubtasks;
- this.metricGroup = metricGroup;
+ this.handover = new Handover();
+
+ final MetricGroup kafkaMetricGroup =
metricGroup.addGroup("KafkaConsumer");
+ addOffsetStateGauge(kafkaMetricGroup);
// if checkpointing is enabled, we are not automatically
committing to Kafka.
-
kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ kafkaProperties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Boolean.toString(!enableCheckpointing));
+
+ this.consumerThread = new KafkaConsumerThread(
+ LOG,
+ handover,
+ kafkaProperties,
+ subscribedPartitions(),
+ kafkaMetricGroup,
+ createCallBridge(),
+ getFetcherName() + " for " +
taskNameWithSubtasks,
+ pollTimeout,
+ useMetrics);
}
//
------------------------------------------------------------------------
@@ -143,133 +123,26 @@ public class Kafka09Fetcher<T> extends
AbstractFetcher<T, TopicPartition> implem
@Override
public void runFetchLoop() throws Exception {
- this.errorHandler = new ExceptionProxy(Thread.currentThread());
-
- // rather than running the main fetch loop directly here, we
spawn a dedicated thread
- // this makes sure that no interrupt() call upon canceling
reaches the Kafka consumer code
- Thread runner = new Thread(this, getFetcherName() + " for " +
taskNameWithSubtasks);
- runner.setDaemon(true);
- runner.start();
-
try {
- runner.join();
- } catch (InterruptedException e) {
- // may be the result of a wake-up after an exception.
we ignore this here and only
- // restore the interruption state
- Thread.currentThread().interrupt();
- }
-
- // make sure we propagate any exception that occurred in the
concurrent fetch thread,
- // before leaving this method
- this.errorHandler.checkAndThrowException();
- }
-
- @Override
- public void cancel() {
- // flag the main thread to exit
- running = false;
-
- // NOTE:
- // - We cannot interrupt the runner thread, because the Kafka
consumer may
- // deadlock when the thread is interrupted while in certain
methods
- // - We cannot call close() on the consumer, because it will
actually throw
- // an exception if a concurrent call is in progress
-
- // make sure the consumer finds out faster that we are shutting
down
- if (consumer != null) {
- consumer.wakeup();
- }
- }
-
- @Override
- public void run() {
- // This method initializes the KafkaConsumer and guarantees it
is torn down properly.
- // This is important, because the consumer has multi-threading
issues,
- // including concurrent 'close()' calls.
-
- final KafkaConsumer<byte[], byte[]> consumer;
- try {
- consumer = new KafkaConsumer<>(kafkaProperties);
- }
- catch (Throwable t) {
- running = false;
- errorHandler.reportError(t);
- return;
- }
-
- // from here on, the consumer will be closed properly
- try {
- assignPartitionsToConsumer(consumer,
convertKafkaPartitions(subscribedPartitions()));
-
- if (useMetrics) {
- final MetricGroup kafkaMetricGroup =
metricGroup.addGroup("KafkaConsumer");
- addOffsetStateGauge(kafkaMetricGroup);
- // register Kafka metrics to Flink
- Map<MetricName, ? extends Metric> metrics =
consumer.metrics();
- if (metrics == null) {
- // MapR's Kafka implementation returns
null here.
- LOG.info("Consumer implementation does
not support metrics");
- } else {
- // we have Kafka metrics, register them
- for (Map.Entry<MetricName, ? extends
Metric> metric: metrics.entrySet()) {
-
kafkaMetricGroup.gauge(metric.getKey().name(), new
KafkaMetricWrapper(metric.getValue()));
- }
- }
- }
-
- // seek the consumer to the initial offsets
- for (KafkaTopicPartitionState<TopicPartition> partition
: subscribedPartitions()) {
- if (partition.isOffsetDefined()) {
- LOG.info("Partition {} has restored
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
- "to position {}",
partition.getKafkaPartitionHandle(), partition.getOffset(),
partition.getOffset() + 1);
-
-
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
- } else {
- // for partitions that do not have
offsets restored from a checkpoint/savepoint,
- // we need to define our internal
offset state for them using the initial offsets retrieved from Kafka
- // by the KafkaConsumer, so that they
are correctly checkpointed and committed on the next checkpoint
-
- long fetchedOffset =
consumer.position(partition.getKafkaPartitionHandle());
-
- LOG.info("Partition {} has no initial
offset; the consumer has position {}, so the initial offset " +
- "will be set to {}",
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
-
- // the fetched offset represents the
next record to process, so we need to subtract it by 1
- partition.setOffset(fetchedOffset - 1);
- }
- }
+ final Handover handover = this.handover;
- // from now on, external operations may call the
consumer
- this.consumer = consumer;
+ // kick off the actual Kafka consumer
+ consumerThread.start();
- // main fetch loop
while (running) {
-
- // check if there is something to commit
- final Map<TopicPartition, OffsetAndMetadata>
toCommit = nextOffsetsToCommit.getAndSet(null);
- if (toCommit != null && !commitInProgress) {
- // reset the work-to-be committed, so
we don't repeatedly commit the same
- // also record that a commit is already
in progress
- commitInProgress = true;
- consumer.commitAsync(toCommit,
offsetCommitCallback);
- }
-
- // get the next batch of records
- final ConsumerRecords<byte[], byte[]> records;
- try {
- records = consumer.poll(pollTimeout);
- }
- catch (WakeupException we) {
- continue;
- }
+ // this blocks until we get the next records
+ // it automatically re-throws exceptions
encountered in the fetcher thread
+ final ConsumerRecords<byte[], byte[]> records =
handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitions()) {
-
- List<ConsumerRecord<byte[], byte[]>>
partitionRecords = records.records(partition.getKafkaPartitionHandle());
+
+ List<ConsumerRecord<byte[], byte[]>>
partitionRecords =
+
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
- T value =
deserializer.deserialize(
+
+ final T value =
deserializer.deserialize(
record.key(),
record.value(),
record.topic(),
record.partition(), record.offset());
@@ -279,32 +152,37 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T,
TopicPartition> implem
break;
}
- // emit the actual record. this
also update offset state atomically
+ // emit the actual record. this
also updates offset state atomically
// and deals with timestamps
and watermark generation
emitRecord(value, partition,
record.offset(), record);
}
}
}
- // end main fetch loop
- }
- catch (Throwable t) {
- if (running) {
- running = false;
- errorHandler.reportError(t);
- } else {
- LOG.debug("Stopped ConsumerThread threw
exception", t);
- }
}
finally {
- try {
- consumer.close();
- }
- catch (Throwable t) {
- LOG.warn("Error while closing Kafka 0.9
consumer", t);
- }
+ // this signals the consumer thread that no more work
is to be done
+ consumerThread.shutdown();
+ }
+
+ // on a clean exit, wait for the runner thread
+ try {
+ consumerThread.join();
+ }
+ catch (InterruptedException e) {
+ // may be the result of a wake-up interruption after an
exception.
+ // we ignore this here and only restore the
interruption state
+ Thread.currentThread().interrupt();
}
}
+ @Override
+ public void cancel() {
+ // flag the main thread to exit. A thread interrupt will come
anyways.
+ running = false;
+ handover.close();
+ consumerThread.shutdown();
+ }
+
//
------------------------------------------------------------------------
// The below methods are overridden in the 0.10 fetcher, which
otherwise
// reuses most of the 0.9 fetcher behavior
@@ -320,14 +198,17 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T,
TopicPartition> implem
emitRecord(record, partition, offset);
}
- protected void assignPartitionsToConsumer(KafkaConsumer<?, ?> consumer,
List<TopicPartition> topicPartitions) {
- consumer.assign(topicPartitions);
- }
-
+ /**
+ * Gets the name of this fetcher, for thread naming and logging
purposes.
+ */
protected String getFetcherName() {
return "Kafka 0.9 Fetcher";
}
+ protected KafkaConsumerCallBridge createCallBridge() {
+ return new KafkaConsumerCallBridge();
+ }
+
//
------------------------------------------------------------------------
// Implement Methods of the AbstractFetcher
//
------------------------------------------------------------------------
@@ -355,37 +236,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T,
TopicPartition> implem
}
// record the work to be committed by the main consumer thread
and make sure the consumer notices that
- if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
- LOG.warn("Committing offsets to Kafka takes longer than
the checkpoint interval. " +
- "Skipping commit of previous offsets
because newer complete checkpoint offsets are available. " +
- "This does not compromise Flink's
checkpoint integrity.");
- }
- if (consumer != null) {
- consumer.wakeup();
- }
- }
-
- //
------------------------------------------------------------------------
- // Utilities
- //
------------------------------------------------------------------------
-
- public static List<TopicPartition>
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
- ArrayList<TopicPartition> result = new
ArrayList<>(partitions.length);
- for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
- result.add(p.getKafkaPartitionHandle());
- }
- return result;
- }
-
- private class CommitCallback implements OffsetCommitCallback {
-
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception ex) {
- commitInProgress = false;
-
- if (ex != null) {
- LOG.warn("Committing offsets to Kafka failed.
This does not compromise Flink's checkpoints.", ex);
- }
- }
+ consumerThread.setOffsetsToCommit(offsetsToCommit);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
new file mode 100644
index 0000000..c17aae6
--- /dev/null
+++
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+
+/**
+ * The ConsumerCallBridge simply calls methods on the {@link KafkaConsumer}.
+ *
+ * This indirection is necessary, because Kafka broke binary compatibility
between 0.9 and 0.10,
+ * for example changing {@code assign(List)} to {@code assign(Collection)}.
+ *
+ * Because of that, we need to two versions whose compiled code goes against
different method signatures.
+ * Even though the source of subclasses may look identical, the byte code will
be different, because they
+ * are compiled against different dependencies.
+ */
+public class KafkaConsumerCallBridge {
+
+ public void assignPartitions(KafkaConsumer<?, ?> consumer,
List<TopicPartition> topicPartitions) throws Exception {
+ consumer.assign(topicPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
new file mode 100644
index 0000000..9cfa840
--- /dev/null
+++
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.metrics.MetricGroup;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The thread the runs the {@link KafkaConsumer}, connecting to the brokers
and polling records.
+ * The thread pushes the data into a {@link Handover} to be picked up by the
fetcher that will
+ * deserialize and emit the records.
+ *
+ * <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to
shut it down.
+ * The Kafka consumer code was found to not always handle interrupts well, and
to even
+ * deadlock in certain situations.
+ *
+ * <p>Implementation Note: This code is written to be reusable in later
versions of the KafkaConsumer.
+ * Because Kafka is not maintaining binary compatibility, we use a "call
bridge" as an indirection
+ * to the KafkaConsumer calls that change signature.
+ */
+public class KafkaConsumerThread extends Thread {
+
+ /** Logger for this consumer */
+ private final Logger log;
+
+ /** The handover of data and exceptions between the consumer thread and
the task thread */
+ private final Handover handover;
+
+ /** The next offsets that the main thread should commit */
+ private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>>
nextOffsetsToCommit;
+
+ /** The configuration for the Kafka consumer */
+ private final Properties kafkaProperties;
+
+ /** The partitions that this consumer reads from */
+ private final KafkaTopicPartitionState<TopicPartition>[]
subscribedPartitions;
+
+ /** We get this from the outside to publish metrics. **/
+ private final MetricGroup kafkaMetricGroup;
+
+ /** The indirections on KafkaConsumer methods, for cases where
KafkaConsumer compatibility is broken */
+ private final KafkaConsumerCallBridge consumerCallBridge;
+
+ /** The maximum number of milliseconds to wait for a fetch batch */
+ private final long pollTimeout;
+
+ /** Flag whether to add Kafka's metrics to the Flink metrics */
+ private final boolean useMetrics;
+
+ /** Reference to the Kafka consumer, once it is created */
+ private volatile KafkaConsumer<byte[], byte[]> consumer;
+
+ /** Flag to mark the main work loop as alive */
+ private volatile boolean running;
+
+ /** Flag tracking whether the latest commit request has completed */
+ private volatile boolean commitInProgress;
+
+
+ public KafkaConsumerThread(
+ Logger log,
+ Handover handover,
+ Properties kafkaProperties,
+ KafkaTopicPartitionState<TopicPartition>[]
subscribedPartitions,
+ MetricGroup kafkaMetricGroup,
+ KafkaConsumerCallBridge consumerCallBridge,
+ String threadName,
+ long pollTimeout,
+ boolean useMetrics) {
+
+ super(threadName);
+ setDaemon(true);
+
+ this.log = checkNotNull(log);
+ this.handover = checkNotNull(handover);
+ this.kafkaProperties = checkNotNull(kafkaProperties);
+ this.subscribedPartitions = checkNotNull(subscribedPartitions);
+ this.kafkaMetricGroup = checkNotNull(kafkaMetricGroup);
+ this.consumerCallBridge = checkNotNull(consumerCallBridge);
+ this.pollTimeout = pollTimeout;
+ this.useMetrics = useMetrics;
+
+ this.nextOffsetsToCommit = new AtomicReference<>();
+ this.running = true;
+ }
+
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void run() {
+ // early exit check
+ if (!running) {
+ return;
+ }
+
+ // this is the means to talk to FlinkKafkaConsumer's main thread
+ final Handover handover = this.handover;
+
+ // This method initializes the KafkaConsumer and guarantees it
is torn down properly.
+ // This is important, because the consumer has multi-threading
issues,
+ // including concurrent 'close()' calls.
+ final KafkaConsumer<byte[], byte[]> consumer;
+ try {
+ consumer = new KafkaConsumer<>(kafkaProperties);
+ }
+ catch (Throwable t) {
+ handover.reportError(t);
+ return;
+ }
+
+ // from here on, the consumer is guaranteed to be closed
properly
+ try {
+ // The callback invoked by Kafka once an offset commit
is complete
+ final OffsetCommitCallback offsetCommitCallback = new
CommitCallback();
+
+ // tell the consumer which partitions to work with
+ consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));
+
+ // register Kafka's very own metrics in Flink's metric
reporters
+ if (useMetrics) {
+ // register Kafka metrics to Flink
+ Map<MetricName, ? extends Metric> metrics =
consumer.metrics();
+ if (metrics == null) {
+ // MapR's Kafka implementation returns
null here.
+ log.info("Consumer implementation does
not support metrics");
+ } else {
+ // we have Kafka metrics, register them
+ for (Map.Entry<MetricName, ? extends
Metric> metric: metrics.entrySet()) {
+
kafkaMetricGroup.gauge(metric.getKey().name(), new
KafkaMetricWrapper(metric.getValue()));
+ }
+ }
+ }
+
+ // early exit check
+ if (!running) {
+ return;
+ }
+
+ // seek the consumer to the initial offsets
+ for (KafkaTopicPartitionState<TopicPartition> partition
: subscribedPartitions) {
+ if (partition.isOffsetDefined()) {
+ log.info("Partition {} has restored
initial offsets {} from checkpoint / savepoint; " +
+ "seeking the consumer
to position {}",
+
partition.getKafkaPartitionHandle(), partition.getOffset(),
partition.getOffset() + 1);
+
+
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+ }
+ else {
+ // for partitions that do not have
offsets restored from a checkpoint/savepoint,
+ // we need to define our internal
offset state for them using the initial offsets retrieved from Kafka
+ // by the KafkaConsumer, so that they
are correctly checkpointed and committed on the next checkpoint
+
+ long fetchedOffset =
consumer.position(partition.getKafkaPartitionHandle());
+
+ log.info("Partition {} has no initial
offset; the consumer has position {}, " +
+ "so the initial offset
will be set to {}",
+
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+ // the fetched offset represents the
next record to process, so we need to subtract it by 1
+ partition.setOffset(fetchedOffset - 1);
+ }
+ }
+
+ // from now on, external operations may call the
consumer
+ this.consumer = consumer;
+
+ // the latest bulk of records. may carry across the
loop if the thread is woken up
+ // from blocking on the handover
+ ConsumerRecords<byte[], byte[]> records = null;
+
+ // main fetch loop
+ while (running) {
+
+ // check if there is something to commit
+ if (!commitInProgress) {
+ // get and reset the work-to-be
committed, so we don't repeatedly commit the same
+ final Map<TopicPartition,
OffsetAndMetadata> toCommit = nextOffsetsToCommit.getAndSet(null);
+
+ if (toCommit != null) {
+ log.debug("Sending async offset
commit request to Kafka broker");
+
+ // also record that a commit is
already in progress
+ // the order here matters!
first set the flag, then send the commit command.
+ commitInProgress = true;
+ consumer.commitAsync(toCommit,
offsetCommitCallback);
+ }
+ }
+
+ // get the next batch of records, unless we did
not manage to hand the old batch over
+ if (records == null) {
+ try {
+ records =
consumer.poll(pollTimeout);
+ }
+ catch (WakeupException we) {
+ continue;
+ }
+ }
+
+ try {
+ handover.produce(records);
+ records = null;
+ }
+ catch (Handover.WakeupException e) {
+ // fall through the loop
+ }
+ }
+ // end main fetch loop
+ }
+ catch (Throwable t) {
+ // let the main thread know and exit
+ // it may be that this exception comes because the main
thread closed the handover, in
+ // which case the below reporting is irrelevant, but
does not hurt either
+ handover.reportError(t);
+ }
+ finally {
+ // make sure the handover is closed if it is not
already closed or has an error
+ handover.close();
+
+ // make sure the KafkaConsumer is closed
+ try {
+ consumer.close();
+ }
+ catch (Throwable t) {
+ log.warn("Error while closing Kafka consumer",
t);
+ }
+ }
+ }
+
+ /**
+ * Shuts this thread down, waking up the thread gracefully if blocked
(without Thread.interrupt() calls).
+ */
+ public void shutdown() {
+ running = false;
+
+ // We cannot call close() on the KafkaConsumer, because it will
actually throw
+ // an exception if a concurrent call is in progress
+
+ // this wakes up the consumer if it is blocked handing over
records
+ handover.wakeupProducer();
+
+ // this wakes up the consumer if it is blocked in a kafka poll
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ }
+
+ /**
+ * Tells this thread to commit a set of offsets. This method does not
block, the committing
+ * operation will happen asynchronously.
+ *
+ * <p>Only one commit operation may be pending at any time. If the
committing takes longer than
+ * the frequency with which this method is called, then some commits
may be skipped due to being
+ * superseded by newer ones.
+ *
+ * @param offsetsToCommit The offsets to commit
+ */
+ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata>
offsetsToCommit) {
+ // record the work to be committed by the main consumer thread
and make sure the consumer notices that
+ if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
+ log.warn("Committing offsets to Kafka takes longer than
the checkpoint interval. " +
+ "Skipping commit of previous offsets
because newer complete checkpoint offsets are available. " +
+ "This does not compromise Flink's
checkpoint integrity.");
+ }
+
+ // if the consumer is blocked in a poll() or handover
operation, wake it up to commit soon
+ handover.wakeupProducer();
+ if (consumer != null) {
+ consumer.wakeup();
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Utilities
+ //
------------------------------------------------------------------------
+
+ private static List<TopicPartition>
convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
+ ArrayList<TopicPartition> result = new
ArrayList<>(partitions.length);
+ for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
+ result.add(p.getKafkaPartitionHandle());
+ }
+ return result;
+ }
+
+ //
------------------------------------------------------------------------
+
+ private class CommitCallback implements OffsetCommitCallback {
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception ex) {
+ commitInProgress = false;
+
+ if (ex != null) {
+ log.warn("Committing offsets to Kafka failed.
This does not compromise Flink's checkpoints.", ex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 1162599..7a82365 100644
---
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -44,6 +48,7 @@ import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -53,6 +58,7 @@ import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -68,7 +74,7 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
* Unit tests for the {@link Kafka09Fetcher}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest(Kafka09Fetcher.class)
+@PrepareForTest(KafkaConsumerThread.class)
public class Kafka09FetcherTest {
@Test
@@ -124,7 +130,7 @@ public class Kafka09FetcherTest {
this.getClass().getClassLoader(),
true, /* checkpointing */
"task_name",
- mock(MetricGroup.class),
+ new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
@@ -174,7 +180,7 @@ public class Kafka09FetcherTest {
// check that there were no errors in the fetcher
final Throwable fetcherError = error.get();
- if (fetcherError != null) {
+ if (fetcherError != null && !(fetcherError instanceof
Handover.ClosedException)) {
throw new Exception("Exception in the fetcher",
fetcherError);
}
final Throwable committerError = commitError.get();
@@ -260,7 +266,7 @@ public class Kafka09FetcherTest {
this.getClass().getClassLoader(),
true, /* checkpointing */
"task_name",
- mock(MetricGroup.class),
+ new UnregisteredMetricsGroup(),
schema,
new Properties(),
0L,
@@ -323,8 +329,154 @@ public class Kafka09FetcherTest {
// check that there were no errors in the fetcher
final Throwable caughtError = error.get();
- if (caughtError != null) {
+ if (caughtError != null && !(caughtError instanceof
Handover.ClosedException)) {
throw new Exception("Exception in the fetcher",
caughtError);
}
}
+
+ @Test
+ public void testCancellationWhenEmitBlocks() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
Arrays.asList(
+ new ConsumerRecord<byte[], byte[]>(topic,
partition, 15, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic,
partition, 16, payload, payload),
+ new ConsumerRecord<byte[], byte[]>(topic,
partition, 17, payload, payload));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new
ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer =
mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new
Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock
invocation) {
+ return consumerRecords;
+ }
+ });
+
+
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ BlockingSourceContext<String> sourceContext = new
BlockingSourceContext<>();
+ List<KafkaTopicPartition> topics =
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+ KeyedDeserializationSchema<String> schema = new
KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ topics,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new
AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
+
+ @Override
+ public void run() {
+ try {
+ fetcher.runFetchLoop();
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ fetcherRunner.start();
+
+ // wait until the thread started to emit records to the source
context
+ sourceContext.waitTillHasBlocker();
+
+ // now we try to cancel the fetcher, including the interruption
usually done on the task thread
+ // once it has finished, there must be no more thread blocked
on the source context
+ fetcher.cancel();
+ fetcherRunner.interrupt();
+ fetcherRunner.join();
+
+ assertFalse("fetcher threads did not properly finish",
sourceContext.isStillBlocking());
+ }
+
+ //
------------------------------------------------------------------------
+ // test utilities
+ //
------------------------------------------------------------------------
+
+ private static final class BlockingSourceContext<T> implements
SourceContext<T> {
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final OneShotLatch inBlocking = new OneShotLatch();
+
+ @Override
+ public void collect(T element) {
+ block();
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ block();
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ block();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return new Object();
+ }
+
+ @Override
+ public void close() {}
+
+ public void waitTillHasBlocker() throws InterruptedException {
+ inBlocking.await();
+ }
+
+ public boolean isStillBlocking() {
+ return lock.isLocked();
+ }
+
+ @SuppressWarnings({"InfiniteLoopStatement",
"SynchronizationOnLocalVariableOrMethodParameter"})
+ private void block() {
+ lock.lock();
+ try {
+ inBlocking.trigger();
+
+ // put this thread to sleep indefinitely
+ final Object o = new Object();
+ while (true) {
+ synchronized (o) {
+ o.wait();
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // exit cleanly, simply reset the interruption
flag
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
deleted file mode 100644
index c1b21b7..0000000
---
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetention09ITCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class KafkaShortRetention09ITCase extends KafkaShortRetentionTestBase {
-
- @Test(timeout=60000)
- public void testAutoOffsetReset() throws Exception {
- runAutoOffsetResetTest();
- }
-
- @Test(timeout=60000)
- public void testAutoOffsetResetNone() throws Exception {
- runFailOnAutoOffsetResetNone();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
new file mode 100644
index 0000000..25040eb
--- /dev/null
+++
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/HandoverTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import
org.apache.flink.streaming.connectors.kafka.internal.Handover.WakeupException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link Handover} between Kafka Consumer Thread and the
fetcher's main thread.
+ */
+public class HandoverTest {
+
+ //
------------------------------------------------------------------------
+ // test produce / consumer
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testWithVariableProducer() throws Exception {
+ runProducerConsumerTest(500, 2, 0);
+ }
+
+ @Test
+ public void testWithVariableConsumer() throws Exception {
+ runProducerConsumerTest(500, 0, 2);
+ }
+
+ @Test
+ public void testWithVariableBoth() throws Exception {
+ runProducerConsumerTest(500, 2, 2);
+ }
+
+ //
------------------------------------------------------------------------
+ // test error propagation
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testPublishErrorOnEmptyHandover() throws Exception {
+ final Handover handover = new Handover();
+
+ Exception error = new Exception();
+ handover.reportError(error);
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Exception e) {
+ assertEquals(error, e);
+ }
+ }
+
+ @Test
+ public void testPublishErrorOnFullHandover() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ IOException error = new IOException();
+ handover.reportError(error);
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Exception e) {
+ assertEquals(error, e);
+ }
+ }
+
+ @Test
+ public void testExceptionMarksClosedOnEmpty() throws Exception {
+ final Handover handover = new Handover();
+
+ IllegalStateException error = new IllegalStateException();
+ handover.reportError(error);
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testExceptionMarksClosedOnFull() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ LinkageError error = new LinkageError();
+ handover.reportError(error);
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // test closing behavior
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testCloseEmptyForConsumer() throws Exception {
+ final Handover handover = new Handover();
+ handover.close();
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseFullForConsumer() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+ handover.close();
+
+ try {
+ handover.pollNext();
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseEmptyForProducer() throws Exception {
+ final Handover handover = new Handover();
+ handover.close();
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testCloseFullForProducer() throws Exception {
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+ handover.close();
+
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.ClosedException e) {
+ // expected
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // test wake up behavior
+ //
------------------------------------------------------------------------
+
+ @Test
+ public void testWakeupDoesNotWakeWhenEmpty() throws Exception {
+ Handover handover = new Handover();
+ handover.wakeupProducer();
+
+ // produce into a woken but empty handover
+ try {
+ handover.produce(createTestRecords());
+ }
+ catch (Handover.WakeupException e) {
+ fail();
+ }
+
+ // handover now has records, next time we wakeup and produce it
needs
+ // to throw an exception
+ handover.wakeupProducer();
+ try {
+ handover.produce(createTestRecords());
+ fail("should throw an exception");
+ }
+ catch (Handover.WakeupException e) {
+ // expected
+ }
+
+ // empty the handover
+ assertNotNull(handover.pollNext());
+
+ // producing into an empty handover should work
+ try {
+ handover.produce(createTestRecords());
+ }
+ catch (Handover.WakeupException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testWakeupWakesOnlyOnce() throws Exception {
+ // create a full handover
+ final Handover handover = new Handover();
+ handover.produce(createTestRecords());
+
+ handover.wakeupProducer();
+
+ try {
+ handover.produce(createTestRecords());
+ fail();
+ } catch (WakeupException e) {
+ // expected
+ }
+
+ CheckedThread producer = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ handover.produce(createTestRecords());
+ }
+ };
+ producer.start();
+
+ // the producer must go blocking
+ producer.waitUntilThreadHoldsLock(10000);
+
+ // release the thread by consuming something
+ assertNotNull(handover.pollNext());
+ producer.sync();
+ }
+
+ //
------------------------------------------------------------------------
+ // utilities
+ //
------------------------------------------------------------------------
+
+ private void runProducerConsumerTest(int numRecords, int
maxProducerDelay, int maxConsumerDelay) throws Exception {
+ // generate test data
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final ConsumerRecords<byte[], byte[]>[] data = new
ConsumerRecords[numRecords];
+ for (int i = 0; i < numRecords; i++) {
+ data[i] = createTestRecords();
+ }
+
+ final Handover handover = new Handover();
+
+ ProducerThread producer = new ProducerThread(handover, data,
maxProducerDelay);
+ ConsumerThread consumer = new ConsumerThread(handover, data,
maxConsumerDelay);
+
+ consumer.start();
+ producer.start();
+
+ // sync first on the consumer, so it propagates assertion errors
+ consumer.sync();
+ producer.sync();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static ConsumerRecords<byte[], byte[]> createTestRecords() {
+ return mock(ConsumerRecords.class);
+ }
+
+ //
------------------------------------------------------------------------
+
+ private static abstract class CheckedThread extends Thread {
+
+ private volatile Throwable error;
+
+ public abstract void go() throws Exception;
+
+ @Override
+ public void run() {
+ try {
+ go();
+ }
+ catch (Throwable t) {
+ error = t;
+ }
+ }
+
+ public void sync() throws Exception {
+ join();
+ if (error != null) {
+ ExceptionUtils.rethrowException(error,
error.getMessage());
+ }
+ }
+
+ public void waitUntilThreadHoldsLock(long timeoutMillis) throws
InterruptedException, TimeoutException {
+ final long deadline = System.nanoTime() + timeoutMillis
* 1_000_000;
+
+ while (!isBlockedOrWaiting() && (System.nanoTime() <
deadline)) {
+ Thread.sleep(1);
+ }
+
+ if (!isBlockedOrWaiting()) {
+ throw new TimeoutException();
+ }
+ }
+
+ private boolean isBlockedOrWaiting() {
+ State state = getState();
+ return state == State.BLOCKED || state == State.WAITING
|| state == State.TIMED_WAITING;
+ }
+ }
+
+ private static class ProducerThread extends CheckedThread {
+
+ private final Random rnd = new Random();
+ private final Handover handover;
+ private final ConsumerRecords<byte[], byte[]>[] data;
+ private final int maxDelay;
+
+ private ProducerThread(Handover handover,
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+ this.handover = handover;
+ this.data = data;
+ this.maxDelay = maxDelay;
+ }
+
+ @Override
+ public void go() throws Exception {
+ for (ConsumerRecords<byte[], byte[]> rec : data) {
+ handover.produce(rec);
+
+ if (maxDelay > 0) {
+ int delay = rnd.nextInt(maxDelay);
+ Thread.sleep(delay);
+ }
+ }
+ }
+ }
+
+ private static class ConsumerThread extends CheckedThread {
+
+ private final Random rnd = new Random();
+ private final Handover handover;
+ private final ConsumerRecords<byte[], byte[]>[] data;
+ private final int maxDelay;
+
+ private ConsumerThread(Handover handover,
ConsumerRecords<byte[], byte[]>[] data, int maxDelay) {
+ this.handover = handover;
+ this.data = data;
+ this.maxDelay = maxDelay;
+ }
+
+ @Override
+ public void go() throws Exception {
+ for (ConsumerRecords<byte[], byte[]> rec : data) {
+ ConsumerRecords<byte[], byte[]> next =
handover.pollNext();
+
+ assertEquals(rec, next);
+
+ if (maxDelay > 0) {
+ int delay = rnd.nextInt(maxDelay);
+ Thread.sleep(delay);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a66e7ad1/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 5c03b78..dccf698 100644
---
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -122,6 +122,7 @@ public class KafkaShortRetentionTestBase implements
Serializable {
*
*/
private static boolean stopProducer = false;
+
public void runAutoOffsetResetTest() throws Exception {
final String topic = "auto-offset-reset-test";