Repository: kafka Updated Branches: refs/heads/trunk 37be6d98d -> 6dc974312
KAFKA-2886: Handle sink task rebalance failures by stopping worker task Author: Jason Gustafson <[email protected]> Reviewers: Liquan Pei <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #767 from hachikuji/KAFKA-2886 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6dc97431 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6dc97431 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6dc97431 Branch: refs/heads/trunk Commit: 6dc974312590f089c9e0ac27e8eb9b848caf32b5 Parents: 37be6d9 Author: Jason Gustafson <[email protected]> Authored: Fri Jan 15 09:28:43 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Jan 15 09:28:43 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/connect/sink/SinkTask.java | 7 +- .../kafka/connect/runtime/WorkerSinkTask.java | 44 +++++++-- .../connect/runtime/WorkerSinkTaskThread.java | 34 ++++--- .../connect/runtime/WorkerSinkTaskTest.java | 94 ++++++++++++++++++++ 4 files changed, 151 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 7e793c8..85ce88a 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -80,7 +80,7 @@ public abstract class SinkTask implements Task { * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. * This method will be called after partition re-assignment completes and before the SinkTask starts - * fetching data. + * fetching data. Note that any errors raised from this method will cause the task to stop. * @param partitions The list of partitions that are now assigned to the task (may include * partitions previously assigned to the task) */ @@ -88,9 +88,10 @@ public abstract class SinkTask implements Task { } /** - * The SinkTask use this method to close writers and commit offsets for partitions that are + * The SinkTask use this method to close writers and commit offsets for partitions that are no * longer assigned to the SinkTask. This method will be called before a rebalance operation starts - * and after the SinkTask stops fetching data. + * and after the SinkTask stops fetching data. Note that any errors raised from this method will cause + * the task to stop. * @param partitions The list of partitions that were assigned to the consumer on the last * rebalance */ http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index a67d0af..f48a734 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -71,6 +71,7 @@ class WorkerSinkTask implements WorkerTask { private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets; private Map<TopicPartition, OffsetAndMetadata> currentOffsets; private boolean pausedForRedelivery; + private RuntimeException rebalanceException; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, Converter keyConverter, Converter valueConverter, Time time) { @@ -84,6 +85,7 @@ class WorkerSinkTask implements WorkerTask { this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); this.pausedForRedelivery = false; + this.rebalanceException = null; } @Override @@ -145,7 +147,7 @@ class WorkerSinkTask implements WorkerTask { // Ensure we're in the group so that if start() wants to rewind offsets, it will have an assignment of partitions // to work with. Any rewinding will be handled immediately when polling starts. try { - consumer.poll(0); + pollConsumer(0); } catch (WakeupException e) { log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this); return false; @@ -168,7 +170,7 @@ class WorkerSinkTask implements WorkerTask { } log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); - ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs); + ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs); assert messageBatch.isEmpty() || msgs.isEmpty(); log.trace("{} polling returned {} messages", id, msgs.count()); @@ -237,6 +239,19 @@ class WorkerSinkTask implements WorkerTask { '}'; } + private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) { + ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs); + + // Exceptions raised from the task during a rebalance should be rethrown to stop the worker + if (rebalanceException != null) { + RuntimeException e = rebalanceException; + rebalanceException = null; + throw e; + } + + return msgs; + } + private KafkaConsumer<byte[], byte[]> createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task @@ -332,6 +347,9 @@ class WorkerSinkTask implements WorkerTask { private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + if (rebalanceException != null) + return; + lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { @@ -365,16 +383,30 @@ class WorkerSinkTask implements WorkerTask { // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon // task start. Since this callback gets invoked during that initial setup before we've started the task, we // need to guard against invoking the user's callback method during that period. - if (started) - task.onPartitionsAssigned(partitions); + if (started) { + try { + task.onPartitionsAssigned(partitions); + } catch (RuntimeException e) { + // The consumer swallows exceptions raised in the rebalance listener, so we need to store + // exceptions and rethrow when poll() returns. + rebalanceException = e; + } + } } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { if (started) { - task.onPartitionsRevoked(partitions); - commitOffsets(true, -1); + try { + task.onPartitionsRevoked(partitions); + commitOffsets(true, -1); + } catch (RuntimeException e) { + // The consumer swallows exceptions raised in the rebalance listener, so we need to store + // exceptions and rethrow when poll() returns. + rebalanceException = e; + } } + // Make sure we don't have any leftover data since offsets will be reset to committed positions messageBatch.clear(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java index b65efa8..93e210a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java @@ -67,11 +67,9 @@ class WorkerSinkTaskThread extends ShutdownableThread { // Maybe commit if (!committing && now >= nextCommit) { - synchronized (this) { - committing = true; - commitSeqno += 1; - commitStarted = now; - } + committing = true; + commitSeqno += 1; + commitStarted = now; task.commitOffsets(false, commitSeqno); nextCommit += task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); } @@ -91,22 +89,20 @@ class WorkerSinkTaskThread extends ShutdownableThread { } public void onCommitCompleted(Throwable error, long seqno) { - synchronized (this) { - if (commitSeqno != seqno) { - log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", - this, - seqno, commitSeqno); + if (commitSeqno != seqno) { + log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", + this, + seqno, commitSeqno); + } else { + if (error != null) { + log.error("Commit of {} offsets threw an unexpected exception: ", task, error); + commitFailures++; } else { - if (error != null) { - log.error("Commit of {} offsets threw an unexpected exception: ", task, error); - commitFailures++; - } else { - log.debug("Finished {} offset commit successfully in {} ms", - task, task.time().milliseconds() - commitStarted); - commitFailures = 0; - } - committing = false; + log.debug("Finished {} offset commit successfully in {} ms", + task, task.time().milliseconds() - commitStarted); + commitFailures = 0; } + committing = false; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6dc97431/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index d5eaace..305a61e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; @@ -55,6 +56,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + @RunWith(PowerMockRunner.class) @PrepareForTest(WorkerSinkTask.class) @PowerMockIgnore("javax.management.*") @@ -156,6 +160,48 @@ public class WorkerSinkTaskTest { PowerMock.verifyAll(); } + @Test + public void testErrorInRebalancePartitionRevocation() throws Exception { + RuntimeException exception = new RuntimeException("Revocation error"); + + expectInitializeTask(); + expectRebalanceRevocationError(exception); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + try { + workerTask.poll(Long.MAX_VALUE); + fail("Poll should have raised the rebalance exception"); + } catch (RuntimeException e) { + assertEquals(exception, e); + } + + PowerMock.verifyAll(); + } + + @Test + public void testErrorInRebalancePartitionAssignment() throws Exception { + RuntimeException exception = new RuntimeException("Assignment error"); + + expectInitializeTask(); + expectRebalanceAssignmentError(exception); + + PowerMock.replayAll(); + + workerTask.start(TASK_PROPS); + workerTask.joinConsumerGroupAndStart(); + try { + workerTask.poll(Long.MAX_VALUE); + fail("Poll should have raised the rebalance exception"); + } catch (RuntimeException e) { + assertEquals(exception, e); + } + + PowerMock.verifyAll(); + } + private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); @@ -183,6 +229,54 @@ public class WorkerSinkTaskTest { PowerMock.expectLastCall(); } + private void expectRebalanceRevocationError(RuntimeException e) { + final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + sinkTask.onPartitionsRevoked(partitions); + EasyMock.expectLastCall().andThrow(e); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + rebalanceListener.getValue().onPartitionsRevoked(partitions); + return ConsumerRecords.empty(); + } + }); + } + + private void expectRebalanceAssignmentError(RuntimeException e) { + final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2); + + sinkTask.onPartitionsRevoked(partitions); + EasyMock.expectLastCall(); + + sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); + EasyMock.expectLastCall(); + + consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject()); + EasyMock.expectLastCall(); + + workerThread.onCommitCompleted(EasyMock.<Throwable>isNull(), EasyMock.anyLong()); + EasyMock.expectLastCall(); + + EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); + EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); + + sinkTask.onPartitionsAssigned(partitions); + EasyMock.expectLastCall().andThrow(e); + + EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + new IAnswer<ConsumerRecords<byte[], byte[]>>() { + @Override + public ConsumerRecords<byte[], byte[]> answer() throws Throwable { + rebalanceListener.getValue().onPartitionsRevoked(partitions); + rebalanceListener.getValue().onPartitionsAssigned(partitions); + return ConsumerRecords.empty(); + } + }); + } + private void expectConsumerPoll(final int numMessages) { EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( new IAnswer<ConsumerRecords<byte[], byte[]>>() {
