yashmayya commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r962110802


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -199,112 +202,156 @@ private RetryWithToleranceOperator setupExecutor() {
 
     @Test
     public void testExecAndHandleRetriableErrorOnce() throws Exception {
-        execAndHandleRetriableError(1, 300, new RetriableException("Test"));
+        execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), 
new RetriableException("Test"), true);
     }
 
     @Test
     public void testExecAndHandleRetriableErrorThrice() throws Exception {
-        execAndHandleRetriableError(3, 2100, new RetriableException("Test"));
+        execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), 
new RetriableException("Test"), true);
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws 
Exception {
+        execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 
2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true);
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws 
Exception {
+        execAndHandleRetriableError(6000, 10, Arrays.asList(300L, 600L, 1200L, 
2400L, 1500L), new RetriableException("Test"), false);
     }
 
     @Test
     public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
-        execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable 
Test"));
+        execAndHandleNonRetriableError(1, new Exception("Non Retriable Test"));
     }
 
     @Test
     public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
-        execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable 
Test"));
+        execAndHandleNonRetriableError(3, new Exception("Non Retriable Test"));
     }
 
-    public void execAndHandleRetriableError(int numRetriableExceptionsThrown, 
long expectedWait, Exception e) throws Exception {
+    @Test
+    public void testExitLatch() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
+        CountDownLatch exitLatch = 
EasyMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        EasyMock.expect(mockOperation.call()).andThrow(new 
RetriableException("test")).anyTimes();
+        EasyMock.expect(exitLatch.await(300, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(300);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(600, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(600);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(1200, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(1200);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(2400, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(2400);
+            retryWithToleranceOperator.triggerStop();
+            return false;
+        });
 
-        
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
-        EasyMock.expect(mockOperation.call()).andReturn("Success");
-
-        replay(mockOperation);
+        // expect no more calls to exitLatch.await() after 
retryWithToleranceOperator.triggerStop() is called
 
-        String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
-        assertFalse(retryWithToleranceOperator.failed());
-        assertEquals("Success", result);
-        assertEquals(expectedWait, time.hiResClockMs());
+        exitLatch.countDown();
+        EasyMock.expectLastCall().once();
 
+        replay(mockOperation, exitLatch);
+        retryWithToleranceOperator.execAndHandleError(mockOperation, 
Exception.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        assertEquals(4500L, time.milliseconds());
         PowerMock.verifyAll();
     }
 
-    public void execAndHandleNonRetriableError(int 
numRetriableExceptionsThrown, long expectedWait, Exception e) throws Exception {
+    public void execAndHandleRetriableError(long errorRetryTimeout, int 
numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean 
successExpected) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
+        CountDownLatch exitLatch = 
EasyMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, 
ALL, time, new ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
         
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
         EasyMock.expect(mockOperation.call()).andReturn("Success");
+        for (Long expectedWait : expectedWaits) {
+            EasyMock.expect(exitLatch.await(expectedWait, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+                time.sleep(expectedWait);
+                return false;
+            });
+        }
 
-        replay(mockOperation);
+        replay(mockOperation, exitLatch);
 
         String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
-        assertTrue(retryWithToleranceOperator.failed());
-        assertNull(result);
-        assertEquals(expectedWait, time.hiResClockMs());
+
+        if (successExpected) {
+            assertFalse(retryWithToleranceOperator.failed());
+            assertEquals("Success", result);
+        } else {
+            assertTrue(retryWithToleranceOperator.failed());
+        }
 
         PowerMock.verifyAll();
     }
 
-    @Test
-    public void testCheckRetryLimit() {
+    public void execAndHandleNonRetriableError(int 
numRetriableExceptionsThrown, Exception e) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(500, 100, NONE, time);
-
-        time.setCurrentTimeMs(100);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
+        CountDownLatch exitLatch = 
EasyMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
-        time.setCurrentTimeMs(200);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
+        
EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+        EasyMock.expect(mockOperation.call()).andReturn("Success");
 
-        time.setCurrentTimeMs(400);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
+        // expect no call to exitLatch.await() which is only called during the 
retry backoff
 
-        time.setCurrentTimeMs(499);
-        assertTrue(retryWithToleranceOperator.checkRetry(0));
+        replay(mockOperation, exitLatch);
 
-        time.setCurrentTimeMs(501);
-        assertFalse(retryWithToleranceOperator.checkRetry(0));
+        String result = 
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
+        assertTrue(retryWithToleranceOperator.failed());
+        assertNull(result);
 
-        time.setCurrentTimeMs(600);
-        assertFalse(retryWithToleranceOperator.checkRetry(0));
+        PowerMock.verifyAll();

Review Comment:
   I've added this verification to `execAndHandleRetriableError` (in the 
success case). But here in `execAndHandleNonRetriableError`, no matter what the 
value for `numNonRetriableExceptionsThrown` is, `mockOperation.call()` will 
only be called once (since the exception isn't retriable) - so we can't verify 
that the expectation that `e` is thrown `numNonRetriableExceptionsThrown` times 
is met. I guess this would've been easier to test if this was migrated to 
Mockito?
   
   Edit: Actually, on second thought - the test where a non-retriable exception 
is thrown more than once is doing basically the same thing as the test where it 
is only thrown once and can probably be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to