Repository: kafka Updated Branches: refs/heads/trunk c0a62b70a -> b75245cfb
MINOR: Wakeups propagated from commitOffsets in WorkerSinkTask should be caught Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1907 from hachikuji/catch-wakeup-worker-sink-task Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b75245cf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b75245cf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b75245cf Branch: refs/heads/trunk Commit: b75245cfbbefc712103b9329da0f27a205baa6aa Parents: c0a62b7 Author: Jason Gustafson <[email protected]> Authored: Mon Sep 26 14:54:01 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Sep 26 14:54:01 2016 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerSinkTask.java | 84 ++++++++++---------- .../connect/runtime/WorkerSinkTaskTest.java | 36 ++++----- 2 files changed, 60 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b75245cf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 624b032..4181799 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -149,26 +149,39 @@ class WorkerSinkTask extends WorkerTask { } protected void iteration() { - long now = time.milliseconds(); + try { + long now = time.milliseconds(); - // Maybe commit - if (!committing && now >= nextCommit) { - commitOffsets(now, false); - nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - } + // Maybe commit + if (!committing && now >= nextCommit) { + commitOffsets(now, false); + nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + } - // Check for timed out commits - long commitTimeout = commitStarted + workerConfig.getLong( - WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); - if (committing && now >= commitTimeout) { - log.warn("Commit of {} offsets timed out", this); - commitFailures++; - committing = false; - } + // Check for timed out commits + long commitTimeout = commitStarted + workerConfig.getLong( + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); + if (committing && now >= commitTimeout) { + log.warn("Commit of {} offsets timed out", this); + commitFailures++; + committing = false; + } - // And process messages - long timeoutMs = Math.max(nextCommit - now, 0); - poll(timeoutMs); + // And process messages + long timeoutMs = Math.max(nextCommit - now, 0); + poll(timeoutMs); + } catch (WakeupException we) { + log.trace("{} consumer woken up", id); + + if (isStopping()) + return; + + if (shouldPause()) { + pauseAll(); + } else if (!pausedForRedelivery) { + resumeAll(); + } + } } private void onCommitCompleted(Throwable error, long seqno) { @@ -211,33 +224,20 @@ class WorkerSinkTask extends WorkerTask { /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ protected void poll(long timeoutMs) { - try { - rewind(); - long retryTimeout = context.timeout(); - if (retryTimeout > 0) { - timeoutMs = Math.min(timeoutMs, retryTimeout); - context.timeout(-1L); - } - - log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); - ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs); - assert messageBatch.isEmpty() || msgs.isEmpty(); - log.trace("{} polling returned {} messages", id, msgs.count()); + rewind(); + long retryTimeout = context.timeout(); + if (retryTimeout > 0) { + timeoutMs = Math.min(timeoutMs, retryTimeout); + context.timeout(-1L); + } - convertMessages(msgs); - deliverMessages(); - } catch (WakeupException we) { - log.trace("{} consumer woken up", id); + log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); + ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs); + assert messageBatch.isEmpty() || msgs.isEmpty(); + log.trace("{} polling returned {} messages", id, msgs.count()); - if (isStopping()) - return; - - if (shouldPause()) { - pauseAll(); - } else if (!pausedForRedelivery) { - resumeAll(); - } - } + convertMessages(msgs); + deliverMessages(); } private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) { http://git-wip-us.apache.org/repos/asf/kafka/blob/b75245cf/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index dbb3f8d..ca218c3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -148,7 +148,7 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); PowerMock.verifyAll(); } @@ -197,14 +197,14 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); // initial assignment - workerTask.poll(Long.MAX_VALUE); // fetch some data + workerTask.iteration(); // initial assignment + workerTask.iteration(); // fetch some data workerTask.transitionTo(TargetState.PAUSED); - workerTask.poll(Long.MAX_VALUE); // wakeup - workerTask.poll(Long.MAX_VALUE); // now paused + workerTask.iteration(); // wakeup + workerTask.iteration(); // now paused workerTask.transitionTo(TargetState.STARTED); - workerTask.poll(Long.MAX_VALUE); // wakeup - workerTask.poll(Long.MAX_VALUE); // now unpaused + workerTask.iteration(); // wakeup + workerTask.iteration(); // now unpaused PowerMock.verifyAll(); } @@ -241,9 +241,9 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); - workerTask.poll(Long.MAX_VALUE); - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); + workerTask.iteration(); + workerTask.iteration(); PowerMock.verifyAll(); } @@ -260,9 +260,9 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); try { - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); fail("Poll should have raised the rebalance exception"); } catch (RuntimeException e) { assertEquals(exception, e); @@ -283,9 +283,9 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); try { - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); fail("Poll should have raised the rebalance exception"); } catch (RuntimeException e) { assertEquals(exception, e); @@ -343,8 +343,8 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); // poll for initial assignment - workerTask.poll(Long.MAX_VALUE); // now rebalance with the wakeup triggered + workerTask.iteration(); // poll for initial assignment + workerTask.iteration(); // now rebalance with the wakeup triggered PowerMock.verifyAll(); } @@ -363,7 +363,7 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); SinkRecord record = records.getValue().iterator().next(); @@ -391,7 +391,7 @@ public class WorkerSinkTaskTest { workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); - workerTask.poll(Long.MAX_VALUE); + workerTask.iteration(); SinkRecord record = records.getValue().iterator().next();
