yashmayya commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1426668693


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -895,6 +909,55 @@ private Map<String, String> baseSourceConnectorConfigs() {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offets should be reset instead
+     * @return the response from the REST API, if the request was successful
+     * @throws InterruptedException if the thread is interrupted while waiting 
for a
+     * request to modify the connector's offsets to succeed
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15826";>KAFKA-15826</a>
+     */
+    private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets 
offsetsToAlter) throws InterruptedException {
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running

Review Comment:
   Okay fair enough, thanks for the explanation!



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -895,6 +909,56 @@ private Map<String, String> baseSourceConnectorConfigs() {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offsets should be reset instead
+     * @return the response from the REST API, if the request was successful
+     * @throws InterruptedException if the thread is interrupted while waiting 
for a
+     * request to modify the connector's offsets to succeed
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15826";>KAFKA-15826</a>
+     */
+    private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets 
offsetsToAlter) throws InterruptedException {
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running
+        String modifyVerb = offsetsToAlter != null ?  "alter" : "reset";
+        String conditionDetails = "Failed to " + modifyVerb + " sink connector 
offsets in time";
+        AtomicReference<String> responseReference = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    try {
+                        if (offsetsToAlter == null) {
+                            
responseReference.set(connect.resetConnectorOffsets(connectorName));
+                        } else {
+                            
responseReference.set(connect.alterConnectorOffsets(connectorName, 
offsetsToAlter));
+                        }
+                        return true;
+                    } catch (ConnectRestException e) {
+                        boolean internalServerError = e.statusCode() == 
INTERNAL_SERVER_ERROR.getStatusCode();
+
+                        String message = 
Optional.of(e.getMessage()).orElse("");
+                        boolean failedToResetConsumerOffsets = 
message.contains(

Review Comment:
   ```suggestion
                           boolean failedToModifyConsumerOffsets = 
message.contains(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to