yashmayya commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1214147751


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java:
##########
@@ -483,6 +494,37 @@ public void testFailureInPoll() throws Exception {
         assertPollMetrics(0);
     }
 
+    @Test
+    public void testRetriableExceptionInPoll() throws Exception {
+
+        final ErrorHandlingMetrics errorHandlingMetrics = 
mock(ErrorHandlingMetrics.class);
+        final RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(30, 15, ALL, SYSTEM, errorHandlingMetrics);

Review Comment:
   Shouldn't we use `MockTime` here so that we can advance the time 
programatically on each call to poll and ensure the retries occur as per the 
deadline?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Looks like this currently only handles 
`org.apache.kafka.connect.errors.RetriableException`, should we also add 
`org.apache.kafka.common.errors.RetriableException`? Looks like there's some 
history here - https://github.com/apache/kafka/pull/6675



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   It looks like the `RetryWithToleranceOperator` currently expects to only be 
used for per-record kind of operations (conversion, transformation etc.) - 
[here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L217),
 
[here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L233)
 and 
[here](https://github.com/apache/kafka/blob/7c3a2846d46f21f2737483eeb7c04e4eee4c2b5f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java#L183)
 for instance. I think we'll need to make a number of updates there before it 
can be used in operations such as `SourceTask::poll` and `SinkTask::put`. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Should we be retrying on `RetriableException`s even if `error.tolerance` is 
set to `none`? Looks like that's what is happening here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -458,13 +458,8 @@ boolean sendRecords() {
     }
 
     protected List<SourceRecord> poll() throws InterruptedException {
-        try {
-            return task.poll();
-        } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-            log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-            // Do nothing. Let the framework poll whenever it's ready.
-            return null;
-        }
+        retryWithToleranceOperator.reset();
+        return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Furthermore, we aren't propagating any exceptions after retry exhaustion 
(due to timeout) so the task will never be marked as failed and we'll just keep 
calling `poll` in the execution loop for the worker source task. That 
definitely doesn't seem like the right thing to do as it defeats the purpose of 
configuring `errors.retry.timeout`, `errors.retry.delay.max.ms`, 
`errors.tolerance`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to