C0urante commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1425557390


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -895,6 +909,55 @@ private Map<String, String> baseSourceConnectorConfigs() {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offets should be reset instead
+     * @return the response from the REST API, if the request was successful
+     * @throws InterruptedException if the thread is interrupted while waiting 
for a
+     * request to modify the connector's offsets to succeed
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15826";>KAFKA-15826</a>
+     */
+    private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets 
offsetsToAlter) throws InterruptedException {
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running
+        String conditionDetails = "Failed to "
+                + (offsetsToAlter != null ?  "alter" : "reset")
+                + " sink connector offsets in time";
+        AtomicReference<String> responseReference = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    try {
+                        if (offsetsToAlter == null) {
+                            
responseReference.set(connect.resetConnectorOffsets(connectorName));
+                        } else {
+                            
responseReference.set(connect.alterConnectorOffsets(connectorName, 
offsetsToAlter));
+                        }
+                        return true;
+                    } catch (ConnectRestException e) {
+                        boolean internalServerError = e.statusCode() == 
INTERNAL_SERVER_ERROR.getStatusCode();
+
+                        String message = 
Optional.of(e.getMessage()).orElse("");
+                        boolean failedToResetConsumerOffsets = 
message.contains("Failed to reset consumer group offsets for connector");

Review Comment:
   Ah yep, good catch! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to