hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1217173565


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -474,129 +454,120 @@ public void testFailureInPollAfterStop() throws 
Exception {
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verify(statusListener).onShutdown(taskId);
+        verify(sourceTask).stop();
+        verifyOffsetFlush(true);
+        verifyClose();
     }
 
     @Test
     public void testPollReturnsNoRecords() throws Exception {
         // Test that the task handles an empty list of records
         createWorkerTask();
 
-        expectCleanStartup();
-
         // We'll wait for some data, then trigger a flush
-        final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
+        final CountDownLatch pollLatch = expectEmptyPolls(new AtomicInteger());
         expectEmptyOffsetFlush();
 
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectEmptyOffsetFlush();
-
-        statusListener.onShutdown(taskId);
-        EasyMock.expectLastCall();
-
-        expectClose();
-
-        PowerMock.replayAll();
-
         workerTask.initialize(TASK_CONFIG);
         Future<?> taskFuture = executor.submit(workerTask);
 
-        assertTrue(awaitLatch(pollLatch));
+        ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG);
         assertTrue(workerTask.commitOffsets());
+        verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class));
+
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
+        verify(offsetWriter, times(2)).beginFlush(anyLong(), 
any(TimeUnit.class));
+        verifyNoMoreInteractions(offsetWriter);
 
         taskFuture.get();
         assertPollMetrics(0);
 
-        PowerMock.verifyAll();
+        verifyCleanStartup();
+        verify(sourceTask).stop();
+        verify(statusListener).onShutdown(taskId);
+        verifyClose();
     }
 
     @Test
     public void testCommit() throws Exception {
         // Test that the task commits properly when prompted
         createWorkerTask();
 
-        expectCleanStartup();
-
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
-        expectOffsetFlush(true);
-
-        offsetWriter.offset(PARTITION, OFFSET);
-        PowerMock.expectLastCall().atLeastOnce();
 
         expectTopicCreation(TOPIC);
+        expectBeginFlush(new Supplier<Boolean>() {
+            Iterator<Boolean> succeed = Arrays.asList(true, false).iterator();
 
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectEmptyOffsetFlush();
-
-        statusListener.onShutdown(taskId);
-        EasyMock.expectLastCall();
-
-        expectClose();
-
-        PowerMock.replayAll();
+            @Override
+            public Boolean get() {
+                return succeed.next();
+            }
+        });

Review Comment:
   neat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to