yashmayya commented on code in PR #12478:
URL: https://github.com/apache/kafka/pull/12478#discussion_r955201377


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws 
Exception {
                 log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
                 errorHandlingMetrics.recordFailure();
                 if (checkRetry(startTime)) {
-                    backoff(attempt, deadline);
-                    if (Thread.currentThread().isInterrupted()) {
-                        log.trace("Thread was interrupted. Marking operation 
as failed.");
+                    if (!backoff(attempt, deadline) || 
Thread.currentThread().isInterrupted()) {

Review Comment:
   Nice catch, thanks for pointing it out! I actually wasn't sure why the 
interrupted status of the thread was being checked here earlier.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -269,7 +289,7 @@ void backoff(int attempt, long deadline) {
             delay = deadline - time.milliseconds();
         }
         log.debug("Sleeping for {} millis", delay);
-        time.sleep(delay);
+        return !exitLatch.await(delay, TimeUnit.MILLISECONDS);

Review Comment:
   I think I get what you're saying - but why can't we simply catch 
`InterruptedException` here in this method itself and return `true` indicating 
that backoff can be called again? Then if shutdown hasn't been scheduled yet - 
i.e. the latch hasn't counted down to 0 yet, the next backoff will be normal; 
but if shutdown has been scheduled - the next call to `ShutDownLatch::await` 
will return immediately and `backoff()` will return false resulting in no 
further retries.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
         return this.context.error();
     }
 
+    /**
+     * Exit from any currently ongoing retry loop and mark the operation as 
failed.
+     * This can be called from a separate thread to break out of an infinite 
retry loop in
+     * {@link #execAndRetry(Operation)}
+     */
+    public void exit() {

Review Comment:
   Makes sense, I like both the suggested renames 👍 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -331,6 +351,15 @@ public synchronized Throwable error() {
         return this.context.error();
     }
 
+    /**
+     * Exit from any currently ongoing retry loop and mark the operation as 
failed.

Review Comment:
   Fair point, how about - "This will stop any further retries for operations. 
This will also mark any ongoing operations that are currently backing off for 
retry as failed." ?
   
   I think the `currently being invoked and do not complete successfully` bit 
might be a little tricky to interpret and should be covered by the no further 
retries bit anyway, WDYT?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws 
Exception {
                 log.trace("Caught a retriable exception while executing {} 
operation with {}", context.stage(), context.executingClass());
                 errorHandlingMetrics.recordFailure();
                 if (checkRetry(startTime)) {

Review Comment:
   Agreed. This can definitely be replaced with `time.milliseconds() < 
deadline` - there are some rudimentary unit tests on `checkRetry()` although I 
think we might be okay with losing them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to