mukkachaitanya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1111817237


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> 
getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config 
storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.
+     *
+     * @param initialRequestTime the time in milliseconds when the original 
request was made (i.e. before any retries)
+     * @param connName the name of the connector
+     * @param exponentialBackoff {@link ExponentialBackoff} used to calculate 
the retry backoff duration
+     * @param attempts the number of retry attempts that have been made
+     */
+    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName, ExponentialBackoff exponentialBackoff, int attempts) {

Review Comment:
   I see currently we are always gonna do an ExponentialBackoff. Should we 
simply move the logic to set up the `ExponentialBackoff`in this function? I was 
thinking something like
   ```java
       private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName, ExponentialBackoff exponentialBackoff, int attempts) {
           ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
                   RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS,
                   2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS,
                   0);
           reconfigureConnectorTasksWithExpontialBackoff(initialRequestTime, 
connName, exponentialBackoff, attempts + 1);
   }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> 
getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config 
storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.

Review Comment:
   I am curious if there is a way to not do infinite retries. If we are 
actually retrying infinitely, esp in the case of `startConnector` phase, then 
the connector just doesn't have tasks. Is it possible to somehow bubble up 
errors as part of connector (not task) status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to