Repository: kafka
Updated Branches:
  refs/heads/trunk 6eacc0de3 -> 6834b91e3


MINOR: KAFKA-3260 follow up, fix commitRecord calls in tests

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1080 from ewencp/minor-kafka-3260-followup-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6834b91e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6834b91e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6834b91e

Branch: refs/heads/trunk
Commit: 6834b91e349f2ed4b8ff5c97eb3b0c390bc27648
Parents: 6eacc0d
Author: Ewen Cheslack-Postava <[email protected]>
Authored: Wed Mar 16 08:24:18 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Wed Mar 16 08:24:18 2016 -0700

----------------------------------------------------------------------
 .../connect/runtime/WorkerSourceTaskTest.java   | 25 +++++++-------------
 1 file changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6834b91e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index ece2985..3dd07a6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -133,8 +133,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the 
offset writer
 
-        expectCommitRecord(10);
-
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
@@ -205,8 +203,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
-        
-        expectCommitRecord(1);
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
@@ -238,7 +234,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
-        expectCommitRecord(1);
         expectOffsetFlush(true);
 
         sourceTask.stop();
@@ -259,14 +254,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         PowerMock.verifyAll();
     }
-
-    private void expectCommitRecord(int count) throws Exception {
-        for (int i = 0; i < count; i++) {
-            sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
-            EasyMock.expectLastCall();
-        }
-    }
-
+    
     @Test
     public void testSendRecordsConvertsData() throws Exception {
         createWorkerTask();
@@ -277,8 +265,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
 
-        expectCommitRecord(records.size());
-
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", records);
@@ -307,8 +293,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         expectSendRecordOnce(true);
         expectSendRecordOnce(false);
 
-        expectCommitRecord(3);
-
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two
@@ -439,6 +423,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         else
             expect.andAnswer(expectResponse);
 
+        // 3. As a result of a successful producer send callback, we'll notify 
the source task of the record commit
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+        if (anyTimes)
+            EasyMock.expectLastCall().anyTimes();
+        else
+            EasyMock.expectLastCall();
+
         return sent;
     }
 

Reply via email to