rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815104923



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the <code>listOffsets</code> 
method of the admin client.
+     * As the <code>listOffsets</code> method might throw a {@link 
RetriableException}, the <code>shouldRetry</code>
+     * flag enables retry, upon catching such exception, if it is set to 
<code>True</code>.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client 
<code>listOffsets</code> call.

Review comment:
       Maybe add:
   ```suggestion
        * @param shouldRetry Boolean flag to enable retry for the admin client 
<code>listOffsets</code> call.
        * @see TopicAdmin#retryEndOffsets
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the <code>listOffsets</code> 
method of the admin client.
+     * As the <code>listOffsets</code> method might throw a {@link 
RetriableException}, the <code>shouldRetry</code>
+     * flag enables retry, upon catching such exception, if it is set to 
<code>True</code>.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client 
<code>listOffsets</code> call.
+     */
+
+    private void readToLogEnd(boolean shouldRetry) {
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
+        // it will subsequently invoke the listOffsets call here

Review comment:
       I don't think this comment adds much value, especially because 
"subsequently" is ambiguous. IMO the method call stands on its own.
   ```suggestion
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+     * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If 
<code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: 
{}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);
+        }
+
+        throw new ConnectException("Fail to retry the task after " + 
maxRetries + " attempts.  Reason: " + lastError, lastError);

Review comment:
       Suggestion to improve the message:
   ```suggestion
           throw new ConnectException("Fail to retry the task after " + 
maxAttempts + " attempts.  Reason: " + lastError.getMessage(), lastError);
   ```
   
   And, if `maxRetries == 0` should we just call the function without any 
special handling, since we're not retrying? For example, should we add 
something like this very early in the method? Doing that would make it easier 
to phrase this message, since the `ConnectException` will only be used when at 
least 1 retry may be attempted.
   ```
   if (maxRetries <= 0) {
       // no special error handling
       return callable.call();
   }

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+     * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If 
<code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.

Review comment:
       A few corrections to the JavaDoc:
   ```suggestion
        * The method executes the callable at least once, optionally retrying 
the callable if
        * {@link org.apache.kafka.connect.errors.RetriableException} is thrown. 
All other types of exceptions
        * are not caught and are rethrown. If all retries are exhausted, the 
last
        * exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
        *
        * <p>If <code>maxRetries</code> is set to 0, the task will be executed 
exactly once.
        * If <code>maxRetries</code> is set to <code>n</code>, the callable 
will be executed at
        * most <code>n + 1</code> times.
        *
        * <p>If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+     * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If 
<code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: 
{}, {}", e.getClass(), e.getMessage());
+                throw e;

Review comment:
       Do we need this catch block and log message? We're re-throwing the 
exception, and the stack trace will include the fact that the function threw 
the exception, and the log message here doesn't seem to add much detail since 
it will be caught and dealt with by the caller.
   ```suggestion
   ```
   If we remove this, we should adjust the JavaDoc to not mention "rethrow" 
since we would not catch the problem and are not rethrowing the exception.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+     * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If 
<code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());

Review comment:
       Are we intentionally not logging the exception as the extra parameter? 
If the exception wraps a more useful exception, we won't see any information 
about the wrapped exception unless we can see the stack trace in the warning 
log message.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {

Review comment:
       Nit: all of these test methods should start with a lowercase character.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+     * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If 
<code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.

Review comment:
       ```suggestion
        * @param callable       the function to execute.
        * @param maxRetries     maximum number of retries; must be 0 or more
        * @param retryBackoffMs the number of milliseconds to delay upon 
receiving a
        *.                      {@link 
org.apache.kafka.connect.errors.RetriableException}
        *                       before retrying again; must be 0 or more
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are 
exhausted, then the last
+     * exception is wrapped into a {@link 
org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If 
<code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to 
<code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in 
between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon 
receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long 
retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying 
automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, 
e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: 
{}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);

Review comment:
       ```suggestion
               if (attempt < maxAttempts) {
                   Utils.sleep(retryBackoffMs);
               }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -703,6 +706,20 @@ public Config describeTopicConfig(String topic) {
         return result;
     }
 
+    protected Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> 
partitions) {
+        int maxRetries = 
Integer.parseInt(adminConfig.getOrDefault(AdminClientConfig.RETRIES_CONFIG, 
DEFAULT_ADMIN_CLIENT_RETRIES).toString());

Review comment:
       Maybe add a comment here:
   ```suggestion
           // These will be 0 or higher, enforced by the admin client when it 
is instantiated and configured
           int maxRetries = 
Integer.parseInt(adminConfig.getOrDefault(AdminClientConfig.RETRIES_CONFIG, 
DEFAULT_ADMIN_CLIENT_RETRIES).toString());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to