rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r816108391



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If all retries are exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code maxRetries} is set to 0, the task will be
+     * executed exactly once.  If {@code maxRetries} is set to ,{@code n} the 
callable will be executed at

Review comment:
       Nit:
   ```suggestion
        * executed exactly once.  If {@code maxRetries} is set to {@code n}, 
the callable will be executed at
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the {@code listOffsets()} 
method of the admin client.
+     * As the {@code listOffsets()} method might throw a {@link 
RetriableException}, the {@code shouldRetry}
+     * flag enables retry, upon catching such exception, if it is set to 
{@code True}.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client 
{@code listOffsets()} call.
+     * @see TopicAdmin#retryEndOffsets
+     */
+

Review comment:
       Nit:
   ```suggestion
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If all retries are exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * <p>If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable the function to execute.
+     * @param maxRetries maximum number of retries; must be 0 or more
+     * @param retryBackoffMs the number of milliseconds to delay upon 
receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException} before 
retrying again; must be 0 or more
+     *

Review comment:
       @philipnee can you please correct this spacing to reflect the project 
standards? Thanks!

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -124,6 +124,9 @@ public String toString() {
     public static final int NO_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
+    private static final String DEFAULT_ADMIN_CLIENT_RETRIES = "10";
+    private static final String DEFAULT_ADMIN_CLIENT_BACKOFF_MS = "100";
+

Review comment:
       Okay, so I was verifying what these defaults really were on the 
`AdminClientConfig`, and I noticed that 
[AdminClientConfig.RETRIES_CONFIG](https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L173-L178)
 defaults `Int.MAX_VALUE`. I did a bit of research, and found 
[KIP-533](https://cwiki.apache.org/confluence/display/KAFKA/KIP-533%3A+Add+default+api+timeout+to+AdminClient)
 changed the defaults, and I don't know that we can/should count on 
`AdminClientConfig.RETRIES`.
   
   I consulted with @kkonstantine, and after a bit of discussion we agree that 
it would be better to not make these configurable and to instead define the 
maximum duration to try to read the offsets as a constant (e.g., 15 minutes), 
plus a constant for retry backoff millis (e.g., 10 seconds). The max duration 
is easier to reason about.
   
   The 15 minutes is a balance between retrying long enough to work with most 
incidents of metadata propagation taking too long, and short enough to fail if 
the worker cannot start up.
   
   Interestingly, I don't think the duration needs to be defined in 
`TopicAdmin`, and is better defined in `KafkaBasedLog.start()`. That way, the 
`KafkaBasedLog` passes the duration into the 
`TopicAdmin.endOffset(Set<TopicPartition> partitions, Duration timeout)` 
method, which is the method that calls the `RetryUtil.retry(Callable<T> 
callable, Duration retryTimeout, long retryBackoffMs)`.
   
   This isn't too much different than what you have, but I do think it's 
cleaner. I don't think we have seen too many retriable exception during 
`KafkaBasedLog.start()`, which means it's kind of an edge case.
   
   WDYT?
   
   
   
   Second, it might be good to add these methods:
   ```
   public Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> 
partitions, Duration timeout) { ... }
   ```
   that retries for the given timeout while using 
`DEFAULT_ADMIN_CLIENT_BACKOFF_MS` for backoffs. Then, if you change 
`KafkaBasedLog.readEndOffsets(...)` and `e the 
`KafkaBasedLog.readToLogEnd(...)` to take a `Duration timeout` rather than a 
boolean, then `KafkaBasedLog.start() could just call 
`readToLogEnd(Duration.ofMinutes(15))` while `KafkaBasedLog.WorkThread.run()` 
could call `readToLogEnd(null)`.
   
   You'd need to refactor your `retry(...)` method a bit to take the 
`Duration`. You could also 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the {@code listOffsets()} 
method of the admin client.
+     * As the {@code listOffsets()} method might throw a {@link 
RetriableException}, the {@code shouldRetry}
+     * flag enables retry, upon catching such exception, if it is set to 
{@code True}.

Review comment:
       Suggestions:
   ```suggestion
        * This method finds the end offsets of the Kafka log's topic 
partitions, optionally retrying
        * if the {@code listOffsets()} method of the admin client throws a 
{@link RetriableException}.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to