yashmayya commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1426668693
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -895,6 +909,55 @@ private Map<String, String> baseSourceConnectorConfigs() { return props; } + /** + * Modify (i.e., alter or reset) the offsets for a sink connector, with retry logic to + * handle cases where laggy task shutdown may have left a consumer in the group. + * @param offsetsToAlter the offsets to alter for the sink connector, or null if + * the connector's offets should be reset instead + * @return the response from the REST API, if the request was successful + * @throws InterruptedException if the thread is interrupted while waiting for a + * request to modify the connector's offsets to succeed + * @see <a href="https://issues.apache.org/jira/browse/KAFKA-15826">KAFKA-15826</a> + */ + private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets offsetsToAlter) throws InterruptedException { + // Some retry logic is necessary to account for KAFKA-15826, + // where laggy sink task startup/shutdown can leave consumers running Review Comment: Okay fair enough, thanks for the explanation! ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -895,6 +909,56 @@ private Map<String, String> baseSourceConnectorConfigs() { return props; } + /** + * Modify (i.e., alter or reset) the offsets for a sink connector, with retry logic to + * handle cases where laggy task shutdown may have left a consumer in the group. + * @param offsetsToAlter the offsets to alter for the sink connector, or null if + * the connector's offsets should be reset instead + * @return the response from the REST API, if the request was successful + * @throws InterruptedException if the thread is interrupted while waiting for a + * request to modify the connector's offsets to succeed + * @see <a href="https://issues.apache.org/jira/browse/KAFKA-15826">KAFKA-15826</a> + */ + private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets offsetsToAlter) throws InterruptedException { + // Some retry logic is necessary to account for KAFKA-15826, + // where laggy sink task startup/shutdown can leave consumers running + String modifyVerb = offsetsToAlter != null ? "alter" : "reset"; + String conditionDetails = "Failed to " + modifyVerb + " sink connector offsets in time"; + AtomicReference<String> responseReference = new AtomicReference<>(); + waitForCondition( + () -> { + try { + if (offsetsToAlter == null) { + responseReference.set(connect.resetConnectorOffsets(connectorName)); + } else { + responseReference.set(connect.alterConnectorOffsets(connectorName, offsetsToAlter)); + } + return true; + } catch (ConnectRestException e) { + boolean internalServerError = e.statusCode() == INTERNAL_SERVER_ERROR.getStatusCode(); + + String message = Optional.of(e.getMessage()).orElse(""); + boolean failedToResetConsumerOffsets = message.contains( Review Comment: ```suggestion boolean failedToModifyConsumerOffsets = message.contains( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org