Repository: kafka Updated Branches: refs/heads/trunk f676cfeb8 -> cfc324333
KAFKA-3290: fix transient test failures in WorkerSourceTaskTest Author: Jason Gustafson <[email protected]> Reviewers: Gwen Shapira Closes #998 from hachikuji/KAFKA-3290 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cfc32433 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cfc32433 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cfc32433 Branch: refs/heads/trunk Commit: cfc324333fa13e06ec7ac5ef3a09d8a6b6b54485 Parents: f676cfe Author: Jason Gustafson <[email protected]> Authored: Wed Mar 2 17:22:14 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Wed Mar 2 17:22:14 2016 -0800 ---------------------------------------------------------------------- .../apache/kafka/connect/runtime/WorkerTask.java | 3 +++ .../connect/runtime/WorkerSourceTaskTest.java | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cfc32433/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index cc69c0f..ff2bb6f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -145,6 +145,9 @@ abstract class WorkerTask implements Runnable { } catch (Throwable t) { if (!cancelled.get()) lifecycleListener.onFailure(id, t); + + if (t instanceof Error) + throw t; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cfc32433/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 8fb8bb5..14c0c6e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -17,11 +17,11 @@ package org.apache.kafka.connect.runtime; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -60,9 +60,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) public class WorkerSourceTaskTest extends ThreadedTest { @@ -200,8 +199,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { final CountDownLatch pollLatch = expectPolls(1); expectOffsetFlush(true); - sourceTask.commit(); - EasyMock.expectLastCall(); sourceTask.stop(); EasyMock.expectLastCall(); expectOffsetFlush(true); @@ -235,11 +232,11 @@ public class WorkerSourceTaskTest extends ThreadedTest { // We'll wait for some data, then trigger a flush final CountDownLatch pollLatch = expectPolls(1); - expectOffsetFlush(false); + expectOffsetFlush(true); sourceTask.stop(); EasyMock.expectLastCall(); - expectOffsetFlush(true); + expectOffsetFlush(false); statusListener.onShutdown(taskId); EasyMock.expectLastCall(); @@ -249,7 +246,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(EMPTY_TASK_PROPS); executor.submit(workerTask); awaitPolls(pollLatch); - assertFalse(workerTask.commitOffsets()); + assertTrue(workerTask.commitOffsets()); workerTask.stop(); assertEquals(true, workerTask.awaitStop(1000)); @@ -319,9 +316,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); EasyMock.expectLastCall(); sourceTask.start(EMPTY_TASK_PROPS); - statusListener.onStartup(taskId); EasyMock.expectLastCall(); + statusListener.onStartup(taskId); EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { @Override public Object answer() throws Throwable { @@ -330,8 +327,10 @@ public class WorkerSourceTaskTest extends ThreadedTest { return null; } }); + sourceTask.stop(); EasyMock.expectLastCall(); + expectOffsetFlush(true); PowerMock.replayAll(); @@ -450,6 +449,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { IExpectationSetters<Void> futureGetExpect = EasyMock.expect( flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); if (succeed) { + sourceTask.commit(); + EasyMock.expectLastCall(); futureGetExpect.andReturn(null); } else { futureGetExpect.andThrow(new TimeoutException());
