C0urante commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r961605579


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -168,22 +170,26 @@ public synchronized <V> V execute(Operation<V> operation, 
Stage stage, Class<?>
      * @throws Exception rethrow if a non-retriable Exception is thrown by the 
operation
      */
     protected <V> V execAndRetry(Operation<V> operation) throws Exception {
+        RetriableException lastException = null;
         int attempt = 0;
         long startTime = time.milliseconds();
         long deadline = (errorRetryTimeout >= 0) ? startTime + 
errorRetryTimeout : Long.MAX_VALUE;
         do {
+            if (stopping) {
+                log.trace("Shutdown has been scheduled. Marking operation as 
failed.");
+                context.error(new ConnectException(lastException));
+                return null;
+            }

Review Comment:
   It'd be cleaner if we put this inside the `catch` block, so that we wouldn't 
have to introduce the possibly-null `lastException` field.
   Thinking something like this:
   ```java
   do {
       try {
           attempt++;
           return operation.call();
       } catch (RetriableException e) {
           log.trace("Caught a retriable exception while executing {} operation 
with {}", context.stage(), context.executingClass());
           errorHandlingMetrics.recordFailure();
           if (time.milliseconds() < deadline) {
               backoff(attempt, deadline);
               errorHandlingMetrics.recordRetry();
           } else {
               log.trace("Can't retry. start={}, attempt={}, deadline={}", 
startTime, attempt, deadline);
               context.error(e);
               return null;
           }
           if (stopping) {
               log.trace("Shutdown has been scheduled. Marking operation as 
failed.");
               context.error(e);
               return null;
           }
       } finally {
           context.attempt(attempt);
       }
   } while (true);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java:
##########
@@ -199,112 +202,156 @@ private RetryWithToleranceOperator setupExecutor() {
 
     @Test
     public void testExecAndHandleRetriableErrorOnce() throws Exception {
-        execAndHandleRetriableError(1, 300, new RetriableException("Test"));
+        execAndHandleRetriableError(6000, 1, Collections.singletonList(300L), 
new RetriableException("Test"), true);
     }
 
     @Test
     public void testExecAndHandleRetriableErrorThrice() throws Exception {
-        execAndHandleRetriableError(3, 2100, new RetriableException("Test"));
+        execAndHandleRetriableError(6000, 3, Arrays.asList(300L, 600L, 1200L), 
new RetriableException("Test"), true);
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorWithInfiniteRetries() throws 
Exception {
+        execAndHandleRetriableError(-1, 8, Arrays.asList(300L, 600L, 1200L, 
2400L, 4800L, 9600L, 19200L, 38400L), new RetriableException("Test"), true);
+    }
+
+    @Test
+    public void testExecAndHandleRetriableErrorWithMaxRetriesExceeded() throws 
Exception {
+        execAndHandleRetriableError(6000, 10, Arrays.asList(300L, 600L, 1200L, 
2400L, 1500L), new RetriableException("Test"), false);
     }
 
     @Test
     public void testExecAndHandleNonRetriableErrorOnce() throws Exception {
-        execAndHandleNonRetriableError(1, 0, new Exception("Non Retriable 
Test"));
+        execAndHandleNonRetriableError(1, new Exception("Non Retriable Test"));
     }
 
     @Test
     public void testExecAndHandleNonRetriableErrorThrice() throws Exception {
-        execAndHandleNonRetriableError(3, 0, new Exception("Non Retriable 
Test"));
+        execAndHandleNonRetriableError(3, new Exception("Non Retriable Test"));
     }
 
-    public void execAndHandleRetriableError(int numRetriableExceptionsThrown, 
long expectedWait, Exception e) throws Exception {
+    @Test
+    public void testExitLatch() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time);
+        CountDownLatch exitLatch = 
EasyMock.createStrictMock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new 
ProcessingContext(), exitLatch);
         retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        EasyMock.expect(mockOperation.call()).andThrow(new 
RetriableException("test")).anyTimes();
+        EasyMock.expect(exitLatch.await(300, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(300);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(600, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(600);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(1200, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(1200);
+            return false;
+        });
+        EasyMock.expect(exitLatch.await(2400, 
TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            time.sleep(2400);
+            retryWithToleranceOperator.triggerStop();

Review Comment:
   I think it's fine 👍
   I was wondering if we'd need to define a timeout for the test in order to 
prevent infinite loops, but since the exit latch is a strict mock, the test 
automatically fails if any unexpected waits on it occur. This is actually 
pretty smooth IMO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to