C0urante commented on code in PR #12615: URL: https://github.com/apache/kafka/pull/12615#discussion_r970065964
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -222,25 +224,31 @@ public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws Excep public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean successExpected) throws Exception { MockTime time = new MockTime(0, 0, 0); - CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + CountDownLatch exitLatch = mock(CountDownLatch.class); RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); - EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown); - - if (successExpected) { - EasyMock.expect(mockOperation.call()).andReturn("Success"); - } + when(mockOperation.call()).thenAnswer(new Answer<String>() { + private int count = 0; + + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + if (count++ != numRetriableExceptionsThrown) { + throw e; + } else if (successExpected) { + return "Success"; + } + return null; + } + }); Review Comment: We can simplify this a bit: ```suggestion OngoingStubbing<String> mockOperationCall = when(mockOperation.call()); for (int i = 0; i < numRetriableExceptionsThrown; i++) { mockOperationCall = mockOperationCall.thenThrow(e); } if (successExpected) { mockOperationCall.thenReturn("Success"); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java: ########## @@ -109,12 +109,10 @@ public void testDLQConfigWithEmptyTopicName() { ProcessingContext context = processingContext(); - EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andThrow(new RuntimeException()); - replay(producer); - - // since topic name is empty, this method should be a NOOP. - // if it attempts to log to the DLQ via the producer, the send mock will throw a RuntimeException. + // since topic name is empty, this method should be a NOOP and producer.send() should + // not be called. deadLetterQueueReporter.report(context); + verify(producer, never()).send(any(), any()); Review Comment: This will lose coverage if we ever start using a different variant of `Producer::send` and forget to update the method signature here (which may happen since the test won't start failing in that case as well). Probably safer to ensure that we have no interactions whatsoever with it: ```suggestion verifyNoInteractions(producer); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java: ########## @@ -78,4 +68,41 @@ public void testGetFutures() { reporter.awaitFutures(topicPartitions); assertTrue(reporter.futures.isEmpty()); } + + @Test + public void testReport() { + initializeReporter(true); + when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null)); + @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class); + when(record.originalRecord()).thenReturn(consumerRecord); + reporter.report(record, new Throwable()); + verify(errorReporter).report(any()); + } + + @Test + public void testReportNoToleratedErrors() { + initializeReporter(false); + when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null)); + @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class); + when(record.originalRecord()).thenReturn(consumerRecord); + assertThrows(ConnectException.class, () -> reporter.report(record, new Throwable())); + verify(errorReporter).report(any()); + } Review Comment: I like the new coverage from these test cases. They're very similar--can we consolidate? ```suggestion @Test public void testReport() { testSingleReport(true); } @Test public void testReportNoToleratedErrors() { testSingleReport(false); } private void testSingleReport(boolean errorsTolerated) { initializeReporter(errorsTolerated); when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null)); @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class); when(record.originalRecord()).thenReturn(consumerRecord); if (errorsTolerated) { reporter.report(record, new Throwable()); } else { assertThrows(ConnectException.class, () -> reporter.report(record, new Throwable())); } verify(errorReporter).report(any()); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -250,97 +258,90 @@ public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriable assertTrue(retryWithToleranceOperator.failed()); } - EasyMock.verify(mockOperation); - PowerMock.verifyAll(); + verifyNoMoreInteractions(exitLatch); + verify(mockOperation, times(successExpected ? numRetriableExceptionsThrown + 1 : numRetriableExceptionsThrown)).call(); } @Test public void testExecAndHandleNonRetriableError() throws Exception { MockTime time = new MockTime(0, 0, 0); - CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class); + CountDownLatch exitLatch = mock(CountDownLatch.class); RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch); retryWithToleranceOperator.metrics(errorHandlingMetrics); - EasyMock.expect(mockOperation.call()).andThrow(new Exception("Test")).times(1); - - // expect no call to exitLatch.await() which is only called during the retry backoff - - replay(mockOperation, exitLatch); + when(mockOperation.call()).thenThrow(new Exception("Test")); String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class); assertTrue(retryWithToleranceOperator.failed()); assertNull(result); - EasyMock.verify(mockOperation); - PowerMock.verifyAll(); + // expect no call to exitLatch.await() which is only called during the retry backoff + verify(mockOperation, times(1)).call(); Review Comment: Is there any useful difference between this assertion and one without `times(1)`? Otherwise, we can simplify (and add some consistency with line 314 below): ```suggestion verify(mockOperation).call(); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java: ########## @@ -78,4 +68,41 @@ public void testGetFutures() { reporter.awaitFutures(topicPartitions); assertTrue(reporter.futures.isEmpty()); } + + @Test + public void testReport() { + initializeReporter(true); + when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null)); + @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class); + when(record.originalRecord()).thenReturn(consumerRecord); + reporter.report(record, new Throwable()); + verify(errorReporter).report(any()); + } + + @Test + public void testReportNoToleratedErrors() { + initializeReporter(false); + when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null)); + @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class); + when(record.originalRecord()).thenReturn(consumerRecord); + assertThrows(ConnectException.class, () -> reporter.report(record, new Throwable())); + verify(errorReporter).report(any()); + } + + private void initializeReporter(boolean errorsTolerated) { + retryWithToleranceOperator = new RetryWithToleranceOperator( + 5000, + ConnectorConfig.ERRORS_RETRY_MAX_DELAY_DEFAULT, + errorsTolerated ? ToleranceType.ALL : ToleranceType.NONE, + Time.SYSTEM + ); + retryWithToleranceOperator.reporters(Collections.singletonList(errorReporter)); + retryWithToleranceOperator.metrics(mock(ErrorHandlingMetrics.class)); Review Comment: Up to you if you'd like to perform this refactoring here or call it out of scope and leave it for later, but I've checked the code base and it seems like metrics for the `RetryWithToleranceOperator` really shouldn't be passed in via setter method and instead should just be a parameter for the constructor. Also, nit: any reason not to instantiate the metrics with an instance variable annotated with `@Mock`? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -379,14 +380,12 @@ public void testDefaultConfigs() { assertEquals(configuration.errorRetryTimeout(), ERRORS_RETRY_TIMEOUT_DEFAULT); assertEquals(configuration.errorMaxDelayInMillis(), ERRORS_RETRY_MAX_DELAY_DEFAULT); assertEquals(configuration.errorToleranceType(), ERRORS_TOLERANCE_DEFAULT); - - PowerMock.verifyAll(); } ConnectorConfig config(Map<String, String> connProps) { Map<String, String> props = new HashMap<>(); props.put(ConnectorConfig.NAME_CONFIG, "test"); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName()); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkConnector.class.getName()); Review Comment: Nice eye, good catch! Just out of curiosity, did this cause any new test failures with the transition to Mockito? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -497,7 +494,7 @@ public void attempt(int attempt) { private static class ExceptionThrower implements Operation<Object> { - private Exception e; + private final Exception e; Review Comment: How about we remove this class entirely and replace it with a lambda? This is the only change that's necessary AFAICT (around line 195): ```java private void testHandleExceptionInStage(Stage type, Exception ex) { RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor(); Operation<?> operation = () -> { throw ex; }; retryWithToleranceOperator.execute(operation, type, RetryWithToleranceOperatorTest.class); assertTrue(retryWithToleranceOperator.failed()); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java: ########## @@ -250,97 +258,90 @@ public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriable assertTrue(retryWithToleranceOperator.failed()); } - EasyMock.verify(mockOperation); - PowerMock.verifyAll(); + verifyNoMoreInteractions(exitLatch); + verify(mockOperation, times(successExpected ? numRetriableExceptionsThrown + 1 : numRetriableExceptionsThrown)).call(); Review Comment: Nice, thank you for preserving the guarantees we have about the number of times we perform the operation 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org