C0urante commented on code in PR #12615:
URL: https://github.com/apache/kafka/pull/12615#discussion_r970065964


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -222,25 +224,31 @@ public void 
testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Excep
 
     public void execAndHandleRetriableError(long errorRetryTimeout, int 
numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean 
successExpected) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        CountDownLatch exitLatch = mock(CountDownLatch.class);
         RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, 
ALL, time, new ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
-        
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
-
-        if (successExpected) {
-            EasyMock.expect(mockOperation.call()).andReturn("Success");
-        }
+        when(mockOperation.call()).thenAnswer(new Answer<String>() {
+            private int count = 0;
+
+            @Override
+            public String answer(InvocationOnMock invocation) throws Throwable 
{
+                if (count++ != numRetriableExceptionsThrown) {
+                    throw e;
+                } else if (successExpected) {
+                    return "Success";
+                }
+                return null;
+            }
+        });

Review Comment:
   We can simplify this a bit:
   ```suggestion
           OngoingStubbing<String> mockOperationCall = 
when(mockOperation.call());
           for (int i = 0; i < numRetriableExceptionsThrown; i++) {
               mockOperationCall = mockOperationCall.thenThrow(e);
           }
           if (successExpected) {
               mockOperationCall.thenReturn("Success");
           }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java:
##########
@@ -109,12 +109,10 @@ public void testDLQConfigWithEmptyTopicName() {
 
         ProcessingContext context = processingContext();
 
-        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andThrow(new RuntimeException());
-        replay(producer);
-
-        // since topic name is empty, this method should be a NOOP.
-        // if it attempts to log to the DLQ via the producer, the send mock 
will throw a RuntimeException.
+        // since topic name is empty, this method should be a NOOP and 
producer.send() should
+        // not be called.
         deadLetterQueueReporter.report(context);
+        verify(producer, never()).send(any(), any());

Review Comment:
   This will lose coverage if we ever start using a different variant of 
`Producer::send` and forget to update the method signature here (which may 
happen since the test won't start failing in that case as well).
   
   Probably safer to ensure that we have no interactions whatsoever with it:
   ```suggestion
                   verifyNoInteractions(producer);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java:
##########
@@ -78,4 +68,41 @@ public void testGetFutures() {
         reporter.awaitFutures(topicPartitions);
         assertTrue(reporter.futures.isEmpty());
     }
+
+    @Test
+    public void testReport() {
+        initializeReporter(true);
+        
when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
+        @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> 
consumerRecord = mock(ConsumerRecord.class);
+        when(record.originalRecord()).thenReturn(consumerRecord);
+        reporter.report(record, new Throwable());
+        verify(errorReporter).report(any());
+    }
+
+    @Test
+    public void testReportNoToleratedErrors() {
+        initializeReporter(false);
+        
when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
+        @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> 
consumerRecord = mock(ConsumerRecord.class);
+        when(record.originalRecord()).thenReturn(consumerRecord);
+        assertThrows(ConnectException.class, () -> reporter.report(record, new 
Throwable()));
+        verify(errorReporter).report(any());
+    }

Review Comment:
   I like the new coverage from these test cases. They're very similar--can we 
consolidate?
   ```suggestion
       @Test
       public void testReport() {
           testSingleReport(true);
       }
   
       @Test
       public void testReportNoToleratedErrors() {
           testSingleReport(false);
       }
   
       private void testSingleReport(boolean errorsTolerated) {
           initializeReporter(errorsTolerated);
           
when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
           @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> 
consumerRecord = mock(ConsumerRecord.class);
           when(record.originalRecord()).thenReturn(consumerRecord);
           if (errorsTolerated) {
               reporter.report(record, new Throwable());
           } else {
               assertThrows(ConnectException.class, () -> 
reporter.report(record, new Throwable()));
           }
           verify(errorReporter).report(any());
       }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -250,97 +258,90 @@ public void execAndHandleRetriableError(long 
errorRetryTimeout, int numRetriable
             assertTrue(retryWithToleranceOperator.failed());
         }
 
-        EasyMock.verify(mockOperation);
-        PowerMock.verifyAll();
+        verifyNoMoreInteractions(exitLatch);
+        verify(mockOperation, times(successExpected ? 
numRetriableExceptionsThrown + 1 : numRetriableExceptionsThrown)).call();
     }
 
     @Test
     public void testExecAndHandleNonRetriableError() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        CountDownLatch exitLatch = 
PowerMock.createStrictMock(CountDownLatch.class);
+        CountDownLatch exitLatch = mock(CountDownLatch.class);
         RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
-        EasyMock.expect(mockOperation.call()).andThrow(new 
Exception("Test")).times(1);
-
-        // expect no call to exitLatch.await() which is only called during the 
retry backoff
-
-        replay(mockOperation, exitLatch);
+        when(mockOperation.call()).thenThrow(new Exception("Test"));
 
         String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
         assertTrue(retryWithToleranceOperator.failed());
         assertNull(result);
 
-        EasyMock.verify(mockOperation);
-        PowerMock.verifyAll();
+        // expect no call to exitLatch.await() which is only called during the 
retry backoff
+        verify(mockOperation, times(1)).call();

Review Comment:
   Is there any useful difference between this assertion and one without 
`times(1)`? Otherwise, we can simplify (and add some consistency with line 314 
below):
   ```suggestion
           verify(mockOperation).call();
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java:
##########
@@ -78,4 +68,41 @@ public void testGetFutures() {
         reporter.awaitFutures(topicPartitions);
         assertTrue(reporter.futures.isEmpty());
     }
+
+    @Test
+    public void testReport() {
+        initializeReporter(true);
+        
when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
+        @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> 
consumerRecord = mock(ConsumerRecord.class);
+        when(record.originalRecord()).thenReturn(consumerRecord);
+        reporter.report(record, new Throwable());
+        verify(errorReporter).report(any());
+    }
+
+    @Test
+    public void testReportNoToleratedErrors() {
+        initializeReporter(false);
+        
when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
+        @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> 
consumerRecord = mock(ConsumerRecord.class);
+        when(record.originalRecord()).thenReturn(consumerRecord);
+        assertThrows(ConnectException.class, () -> reporter.report(record, new 
Throwable()));
+        verify(errorReporter).report(any());
+    }
+
+    private void initializeReporter(boolean errorsTolerated) {
+        retryWithToleranceOperator = new RetryWithToleranceOperator(
+                5000,
+                ConnectorConfig.ERRORS_RETRY_MAX_DELAY_DEFAULT,
+                errorsTolerated ? ToleranceType.ALL : ToleranceType.NONE,
+                Time.SYSTEM
+        );
+        
retryWithToleranceOperator.reporters(Collections.singletonList(errorReporter));
+        retryWithToleranceOperator.metrics(mock(ErrorHandlingMetrics.class));

Review Comment:
   Up to you if you'd like to perform this refactoring here or call it out of 
scope and leave it for later, but I've checked the code base and it seems like 
metrics for the `RetryWithToleranceOperator` really shouldn't be passed in via 
setter method and instead should just be a parameter for the constructor.
   
   Also, nit: any reason not to instantiate the metrics with an instance 
variable annotated with `@Mock`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -379,14 +380,12 @@ public void testDefaultConfigs() {
         assertEquals(configuration.errorRetryTimeout(), 
ERRORS_RETRY_TIMEOUT_DEFAULT);
         assertEquals(configuration.errorMaxDelayInMillis(), 
ERRORS_RETRY_MAX_DELAY_DEFAULT);
         assertEquals(configuration.errorToleranceType(), 
ERRORS_TOLERANCE_DEFAULT);
-
-        PowerMock.verifyAll();
     }
 
     ConnectorConfig config(Map<String, String> connProps) {
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.NAME_CONFIG, "test");
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
SinkTask.class.getName());
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
SinkConnector.class.getName());

Review Comment:
   Nice eye, good catch!
   
   Just out of curiosity, did this cause any new test failures with the 
transition to Mockito?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -497,7 +494,7 @@ public void attempt(int attempt) {
 
 
     private static class ExceptionThrower implements Operation<Object> {
-        private Exception e;
+        private final Exception e;

Review Comment:
   How about we remove this class entirely and replace it with a lambda? This 
is the only change that's necessary AFAICT (around line 195):
   
   ```java
   private void testHandleExceptionInStage(Stage type, Exception ex) {
       RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
       Operation<?> operation = () -> {
           throw ex;
       };
       retryWithToleranceOperator.execute(operation, type, 
RetryWithToleranceOperatorTest.class);
       assertTrue(retryWithToleranceOperator.failed());
   }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -250,97 +258,90 @@ public void execAndHandleRetriableError(long 
errorRetryTimeout, int numRetriable
             assertTrue(retryWithToleranceOperator.failed());
         }
 
-        EasyMock.verify(mockOperation);
-        PowerMock.verifyAll();
+        verifyNoMoreInteractions(exitLatch);
+        verify(mockOperation, times(successExpected ? 
numRetriableExceptionsThrown + 1 : numRetriableExceptionsThrown)).call();

Review Comment:
   Nice, thank you for preserving the guarantees we have about the number of 
times we perform the operation 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to