yashmayya commented on code in PR #12478: URL: https://github.com/apache/kafka/pull/12478#discussion_r955201377
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws Exception { log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass()); errorHandlingMetrics.recordFailure(); if (checkRetry(startTime)) { - backoff(attempt, deadline); - if (Thread.currentThread().isInterrupted()) { - log.trace("Thread was interrupted. Marking operation as failed."); + if (!backoff(attempt, deadline) || Thread.currentThread().isInterrupted()) { Review Comment: Nice catch, thanks for pointing it out! I actually wasn't sure why the interrupted status of the thread was being checked here earlier. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -269,7 +289,7 @@ void backoff(int attempt, long deadline) { delay = deadline - time.milliseconds(); } log.debug("Sleeping for {} millis", delay); - time.sleep(delay); + return !exitLatch.await(delay, TimeUnit.MILLISECONDS); Review Comment: I think I get what you're saying - but why can't we simply catch `InterruptedException` here in this method itself and return `true` indicating that backoff can be called again? Then if shutdown hasn't been scheduled yet - i.e. the latch hasn't counted down to 0 yet, the next backoff will be normal; but if shutdown has been scheduled - the next call to `ShutDownLatch::await` will return immediately and `backoff()` will return false resulting in no further retries. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -331,6 +351,15 @@ public synchronized Throwable error() { return this.context.error(); } + /** + * Exit from any currently ongoing retry loop and mark the operation as failed. + * This can be called from a separate thread to break out of an infinite retry loop in + * {@link #execAndRetry(Operation)} + */ + public void exit() { Review Comment: Makes sense, I like both the suggested renames 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -331,6 +351,15 @@ public synchronized Throwable error() { return this.context.error(); } + /** + * Exit from any currently ongoing retry loop and mark the operation as failed. Review Comment: Fair point, how about - "This will stop any further retries for operations. This will also mark any ongoing operations that are currently backing off for retry as failed." ? I think the `currently being invoked and do not complete successfully` bit might be a little tricky to interpret and should be covered by the no further retries bit anyway, WDYT? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java: ########## @@ -175,9 +179,8 @@ protected <V> V execAndRetry(Operation<V> operation) throws Exception { log.trace("Caught a retriable exception while executing {} operation with {}", context.stage(), context.executingClass()); errorHandlingMetrics.recordFailure(); if (checkRetry(startTime)) { Review Comment: Agreed. This can definitely be replaced with `time.milliseconds() < deadline` - there are some rudimentary unit tests on `checkRetry()` although I think we might be okay with losing them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org