yashmayya commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1214147751
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java: ########## @@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception { assertPollMetrics(0); } + @Test + public void testRetriableExceptionInPoll() throws Exception { + + final ErrorHandlingMetrics errorHandlingMetrics = mock(ErrorHandlingMetrics.class); + final RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics); Review Comment: Shouldn't we use `MockTime` here so that we can advance the time programatically on each call to poll and ensure the retries occur as per the deadline? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ########## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List<SourceRecord> poll() throws InterruptedException { - try { - return task.poll(); - } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); - // Do nothing. Let the framework poll whenever it's ready. - return null; - } + retryWithToleranceOperator.reset(); + return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Looks like this currently only handles `org.apache.kafka.connect.errors.RetriableException`, should we also add `org.apache.kafka.common.errors.RetriableException`? Looks like there's some history here - https://github.com/apache/kafka/pull/6675 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ########## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List<SourceRecord> poll() throws InterruptedException { - try { - return task.poll(); - } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); - // Do nothing. Let the framework poll whenever it's ready. - return null; - } + retryWithToleranceOperator.reset(); + return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: It looks like the `RetryWithToleranceOperator` currently expects to only be used for per-record kind of operations (conversion, transformation etc.) - [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217), [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233) and [here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L183) for instance. I think we'll need to make a number of updates there before it can be used in operations such as `SourceTask::poll` and `SinkTask::put`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ########## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List<SourceRecord> poll() throws InterruptedException { - try { - return task.poll(); - } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); - // Do nothing. Let the framework poll whenever it's ready. - return null; - } + retryWithToleranceOperator.reset(); + return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Should we be retrying on `RetriableException`s even if `error.tolerance` is set to `none`? Looks like that's what is happening here? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ########## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List<SourceRecord> poll() throws InterruptedException { - try { - return task.poll(); - } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { - log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); - // Do nothing. Let the framework poll whenever it's ready. - return null; - } + retryWithToleranceOperator.reset(); + return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Furthermore, we aren't propagating any exceptions after retry exhaustion (due to timeout) so the task will never be marked as failed and we'll just keep calling `poll` in the execution loop for the worker source task. That definitely doesn't seem like the right thing to do as it defeats the purpose of configuring `errors.retry.timeout`, `errors.retry.delay.max.ms`, `errors.tolerance`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org