Repository: kafka Updated Branches: refs/heads/trunk 6eacc0de3 -> 6834b91e3
MINOR: KAFKA-3260 follow up, fix commitRecord calls in tests Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1080 from ewencp/minor-kafka-3260-followup-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6834b91e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6834b91e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6834b91e Branch: refs/heads/trunk Commit: 6834b91e349f2ed4b8ff5c97eb3b0c390bc27648 Parents: 6eacc0d Author: Ewen Cheslack-Postava <[email protected]> Authored: Wed Mar 16 08:24:18 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Mar 16 08:24:18 2016 -0700 ---------------------------------------------------------------------- .../connect/runtime/WorkerSourceTaskTest.java | 25 +++++++------------- 1 file changed, 8 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6834b91e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index ece2985..3dd07a6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -133,8 +133,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { final CountDownLatch pollLatch = expectPolls(10); // In this test, we don't flush, so nothing goes any further than the offset writer - expectCommitRecord(10); - sourceTask.stop(); EasyMock.expectLastCall(); expectOffsetFlush(true); @@ -205,8 +203,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { sourceTask.stop(); EasyMock.expectLastCall(); expectOffsetFlush(true); - - expectCommitRecord(1); statusListener.onShutdown(taskId); EasyMock.expectLastCall(); @@ -238,7 +234,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { // We'll wait for some data, then trigger a flush final CountDownLatch pollLatch = expectPolls(1); - expectCommitRecord(1); expectOffsetFlush(true); sourceTask.stop(); @@ -259,14 +254,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { PowerMock.verifyAll(); } - - private void expectCommitRecord(int count) throws Exception { - for (int i = 0; i < count; i++) { - sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class)); - EasyMock.expectLastCall(); - } - } - + @Test public void testSendRecordsConvertsData() throws Exception { createWorkerTask(); @@ -277,8 +265,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes(); - expectCommitRecord(records.size()); - PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", records); @@ -307,8 +293,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { expectSendRecordOnce(true); expectSendRecordOnce(false); - expectCommitRecord(3); - PowerMock.replayAll(); // Try to send 3, make first pass, second fail. Should save last two @@ -439,6 +423,13 @@ public class WorkerSourceTaskTest extends ThreadedTest { else expect.andAnswer(expectResponse); + // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit + sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class)); + if (anyTimes) + EasyMock.expectLastCall().anyTimes(); + else + EasyMock.expectLastCall(); + return sent; }
