Repository: kafka Updated Branches: refs/heads/trunk 8a417c89d -> d1546960d
KAFKA-3935; Fix test_restart_failed_task system test for SinkTasks Fix the test by using a more liberal timeout and forcing more frequent SinkTask.put() calls. Also add some logging to aid future debugging. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #1663 from ewencp/kafka-3935-fix-restart-system-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1546960 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1546960 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1546960 Branch: refs/heads/trunk Commit: d1546960de0aa43989680a59c8f6b1ae7cb333e9 Parents: 8a417c8 Author: Ewen Cheslack-Postava <[email protected]> Authored: Tue Jul 26 03:02:02 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Jul 26 03:02:02 2016 +0100 ---------------------------------------------------------------------- .../kafka/connect/tools/MockConnector.java | 7 +++++++ .../apache/kafka/connect/tools/MockSinkTask.java | 19 ++++++++++++++++++- .../kafka/connect/tools/MockSourceTask.java | 9 ++++++++- .../tests/connect/connect_distributed_test.py | 2 +- 4 files changed, 34 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java index 919e896..51bb519 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; @@ -49,6 +51,8 @@ public class MockConnector extends Connector { public static final long DEFAULT_FAILURE_DELAY_MS = 15000; + private static final Logger log = LoggerFactory.getLogger(MockConnector.class); + private Map<String, String> config; private ScheduledExecutorService executor; @@ -69,10 +73,12 @@ public class MockConnector extends Connector { if (delayMsString != null) delayMs = Long.parseLong(delayMsString); + log.debug("Started MockConnector with failure delay of {} ms", delayMs); executor = Executors.newSingleThreadScheduledExecutor(); executor.schedule(new Runnable() { @Override public void run() { + log.debug("Triggering connector failure"); context.raiseError(new RuntimeException()); } }, delayMs, TimeUnit.MILLISECONDS); @@ -86,6 +92,7 @@ public class MockConnector extends Connector { @Override public List<Map<String, String>> taskConfigs(int maxTasks) { + log.debug("Creating single task for MockConnector"); return Collections.singletonList(config); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java index 2e4b35e..b0de58d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java @@ -21,11 +21,14 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Map; public class MockSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(MockSinkTask.class); private String mockMode; private long startTimeMs; @@ -47,6 +50,9 @@ public class MockSinkTask extends SinkTask { this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS; if (delayMsString != null) failureDelayMs = Long.parseLong(delayMsString); + + log.debug("Started MockSinkTask at {} with failure scheduled in {} ms", startTimeMs, failureDelayMs); + setTimeout(); } } @@ -54,8 +60,11 @@ public class MockSinkTask extends SinkTask { public void put(Collection<SinkRecord> records) { if (MockConnector.TASK_FAILURE.equals(mockMode)) { long now = System.currentTimeMillis(); - if (now > startTimeMs + failureDelayMs) + if (now > startTimeMs + failureDelayMs) { + log.debug("Triggering sink task failure"); throw new RuntimeException(); + } + setTimeout(); } } @@ -68,4 +77,12 @@ public class MockSinkTask extends SinkTask { public void stop() { } + + private void setTimeout() { + // Set a reasonable minimum delay. Since this mock task may not actually consume any data from Kafka, it may only + // see put() calls triggered by wakeups for offset commits. To make sure we aren't tied to the offset commit + // interval, we force a wakeup every 250ms or after the failure delay, whichever is smaller. This is not overly + // aggressive but ensures any scheduled tasks this connector performs are reasonably close to the target time. + context.timeout(Math.min(failureDelayMs, 250)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java index eb896af..d7288f8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java @@ -19,12 +19,15 @@ package org.apache.kafka.connect.tools; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; import java.util.Map; public class MockSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(MockSourceTask.class); private String mockMode; private long startTimeMs; @@ -46,6 +49,8 @@ public class MockSourceTask extends SourceTask { this.failureDelayMs = MockConnector.DEFAULT_FAILURE_DELAY_MS; if (delayMsString != null) failureDelayMs = Long.parseLong(delayMsString); + + log.debug("Started MockSourceTask at {} with failure scheduled in {} ms", startTimeMs, failureDelayMs); } } @@ -53,8 +58,10 @@ public class MockSourceTask extends SourceTask { public List<SourceRecord> poll() throws InterruptedException { if (MockConnector.TASK_FAILURE.equals(mockMode)) { long now = System.currentTimeMillis(); - if (now > startTimeMs + failureDelayMs) + if (now > startTimeMs + failureDelayMs) { + log.debug("Triggering source task failure"); throw new RuntimeException(); + } } return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d1546960/tests/kafkatest/tests/connect/connect_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index b9757ba..1902c59 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -171,7 +171,7 @@ class ConnectDistributedTest(Test): connector.start() task_id = 0 - wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=15, + wait_until(lambda: self.task_is_failed(connector, task_id), timeout_sec=20, err_msg="Failed to see task transition to the FAILED state") self.cc.restart_task(connector.name, task_id)
