C0urante commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1221803692
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -374,19 +374,38 @@ public synchronized void connectorOffsets(String connName, Callback<ConnectorOff } @Override - public synchronized void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + protected synchronized void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + if (!modifyConnectorOffsetsChecks(connName, cb)) { + return; + } + + if (offsets == null) { + worker.resetConnectorOffsets(connName, configState.connectorConfig(connName), cb); + } else { + worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb); + } + } + + /** + * This method performs a few checks for external requests to modify (alter or reset) connector offsets and + * completes the callback exceptionally if any check fails. + * @param connName the name of the connector whose offsets are to be modified + * @param cb callback to invoke upon completion + * @return true if all the checks passed, false otherwise + */ + private boolean modifyConnectorOffsetsChecks(String connName, Callback<Message> cb) { if (!configState.contains(connName)) { cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); - return; + return false; } if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { - cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. " + - "This can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); - return; + cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: Nit: "modified externally" may not be very clear to users. I think "altered" captures the general concept well enough even if they're doing a reset, but if you'd like to be more precise, maybe we can say "altered or reset"? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> c // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled - // naturally because requests to alter consumer group offsets will fail if there are still active members in the group. + // naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members + // in the group. if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { - callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + - "can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); + callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: Same thought RE "modified externally" (though seeing how "modified"/"modify" is used in some other log and exception messages in this class, I think just "modified" could work here too). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { + completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb); + return; } - @SuppressWarnings("rawtypes") - KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); - - compositeAdminFuture.whenComplete((ignored, error) -> { - if (error != null) { - // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client - // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped - // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to - // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException - if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + - "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + - "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + - "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), - null); - } else { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); - } - } else { - completeAlterOffsetsCallback(alterOffsetsResult, cb); - } - }).whenComplete((ignored, ignoredError) -> { - // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw - // an exception itself, and we can thus ignore the error here - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); - }); + if (isReset) { + resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult); + } else { + alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult); + } } catch (Throwable t) { - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName); throw t; } } catch (Throwable t) { - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null); } })); } /** - * Alter a source connector's offsets. + * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets} + * and / or {@link Admin#deleteConsumerGroupOffsets}. * - * @param connName the name of the source connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be altered + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; should be closed after use Review Comment: We close the admin client in this method, right? I think we can be clearer about that: ```suggestion * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; will be closed after use ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); Review Comment: Can we use a `Timer` to limit the total runtime of this method to 90 seconds, instead of using a 90-second timeout for each admin client request? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1460,30 +1602,31 @@ void alterSourceConnectorOffsets(String connName, Connector connector, Map<Strin try { offsetWriterCallback.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { - throw new ConnectException("Failed to alter offsets for source connector " + connName, e.getCause()); + throw new ConnectException("Failed to modify offsets for source connector " + connName, e.getCause()); } catch (TimeoutException e) { - throw new ConnectException("Timed out while attempting to alter offsets for source connector " + connName, e); + throw new ConnectException("Timed out while attempting to modify offsets for source connector " + connName, e); } catch (InterruptedException e) { - throw new ConnectException("Unexpectedly interrupted while attempting to alter offsets for source connector " + connName, e); + throw new ConnectException("Unexpectedly interrupted while attempting to modify offsets for source connector " + connName, e); } - completeAlterOffsetsCallback(alterOffsetsResult, cb); + completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb); } catch (Throwable t) { - log.error("Failed to alter offsets for source connector {}", connName, t); - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for source connector " + connName), null); + log.error("Failed to modify offsets for source connector {}", connName, t); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for source connector " + connName), null); } finally { - Utils.closeQuietly(offsetStore::stop, "Offset store for offset alter request for connector " + connName); + Utils.closeQuietly(offsetStore::stop, "Offset store for offset modification request for connector " + connName); } })); } - private void completeAlterOffsetsCallback(boolean alterOffsetsResult, Callback<Message> cb) { + private void completeAlterOffsetsCallback(boolean alterOffsetsResult, boolean isReset, Callback<Message> cb) { Review Comment: Should we rename this to `completeModifyOffsetsCallback` to match the other naming changes we've made in this class? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { + completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb); + return; } - @SuppressWarnings("rawtypes") - KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); - - compositeAdminFuture.whenComplete((ignored, error) -> { - if (error != null) { - // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client - // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped - // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to - // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException - if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + - "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + - "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + - "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), - null); - } else { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); - } - } else { - completeAlterOffsetsCallback(alterOffsetsResult, cb); - } - }).whenComplete((ignored, ignoredError) -> { - // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw - // an exception itself, and we can thus ignore the error here - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); - }); + if (isReset) { + resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult); + } else { + alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult); + } } catch (Throwable t) { - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName); throw t; } } catch (Throwable t) { - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null); } })); } /** - * Alter a source connector's offsets. + * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets} + * and / or {@link Admin#deleteConsumerGroupOffsets}. * - * @param connName the name of the source connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be altered + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; should be closed after use + * @param offsetsToWrite a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + * @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector + */ + private void alterSinkConnectorOffsets(String connName, String groupId, Admin admin, Map<TopicPartition, Long> offsetsToWrite, + Callback<Message> cb, boolean alterOffsetsResult) { + List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFutures.add(alterConsumerGroupOffsetsResult.all()); + } + + Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + + adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + } + + @SuppressWarnings("rawtypes") + KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); + + compositeAdminFuture.whenComplete((ignored, error) -> { + if (error != null) { + // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client + // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to + // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException + if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, false, cb); + } + }).whenComplete((ignored, ignoredError) -> { + // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw + // an exception itself, and we can thus ignore the error here + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + }); Review Comment: It looks like this is just pulled out from the [existing `alterSinkConnectorOffsets` method](https://github.com/apache/kafka/blob/9cfc4b9373bf887ec2fb95bc607f4ebd1c8613c0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1323-L1383) (which is fine); let me know if there are any other changes that I've missed? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -574,6 +574,199 @@ public void alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect, "Source connector offsets should reflect the expected number of records produced"); } + @Test + public void testResetSinkConnectorOffsets() throws Exception { + resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); + } + + @Test + public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + "overridden-group-id"); + resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); + // Ensure that the overridden consumer group ID was the one actually used + try (Admin admin = connect.kafka().createAdminClient()) { + Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + } + } + + @Test + public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster); + } + } + + private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { + int numPartitions = 3; + int numMessages = 10; + kafkaCluster.createTopic(TOPIC, numPartitions); + + // Produce numMessages messages to each partition + for (int partition = 0; partition < numPartitions; partition++) { + for (int message = 0; message < numMessages; message++) { + kafkaCluster.produce(TOPIC, partition, "key", "value"); + } + } + // Create sink connector + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, numMessages, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Reset the sink connector's offsets + String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error + response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Resume the connector and expect its offsets to catch up to the latest offsets + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector tasks did not resume in time" + ); + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + } + + @Test + public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { + connect.kafka().createTopic(TOPIC, 1); + + // Produce 10 messages + for (int message = 0; message < 10; message++) { + connect.kafka().produce(TOPIC, 0, "key", "value"); + } + + // Configure a sink connector whose sink task blocks in its stop method + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put("block", "Task::stop"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + + // Try to reset the offsets + ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); + assertThat(e.getMessage(), containsString("zombie sink task")); + } + + @Test + public void testResetSourceConnectorOffsets() throws Exception { + resetAndVerifySourceConnectorOffsets(connect, baseSourceConnectorConfigs()); + } + + // Note that the following test also implicitly tests the custom offsets topic case since source connectors always use + // a separate offsets topic when exactly once support is enabled and the Kafka cluster targeted by the source connector + // is different from the Connect cluster's backing Kafka cluster. + @Test + public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabledAndDifferentKafkaClusterTargeted() throws Exception { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + EmbeddedKafkaCluster connectorTargetedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + + // This embedded Connect cluster will internally spin up its own embedded Kafka cluster + EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new EmbeddedConnectCluster.Builder() Review Comment: Do we need to close this when we're done with it? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, Map<String, String> connector connector = plugins.newConnector(connectorClassOrAlias); if (ConnectUtils.isSinkConnector(connector)) { log.debug("Altering consumer group offsets for sink connector: {}", connName); - alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); } else { log.debug("Altering offsets for source connector: {}", connName); - alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } + } + } + + /** + * Reset a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be reset + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion + */ + public void resetConnectorOffsets(String connName, Map<String, String> connectorConfig, Callback<Message> cb) { Review Comment: Do we need separate methods for resetting and altering offsets, or can we borrow the `modifyConnectorOffsets` strategy from the `AbstractHerder` and accept a possibly-null `offsets` parameter to distinguish between the two operations? If we're worried about accidentally introducing a nasty bug where an empty-bodied alter request causes an unintentional reset, we can add an integration test for that case. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { + completeAlterOffsetsCallback(alterOffsetsResult, isReset, cb); + return; } - @SuppressWarnings("rawtypes") - KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); - - compositeAdminFuture.whenComplete((ignored, error) -> { - if (error != null) { - // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client - // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped - // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to - // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException - if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + - "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + - "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + - "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), - null); - } else { - cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); - } - } else { - completeAlterOffsetsCallback(alterOffsetsResult, cb); - } - }).whenComplete((ignored, ignoredError) -> { - // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw - // an exception itself, and we can thus ignore the error here - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); - }); + if (isReset) { + resetSinkConnectorOffsets(connName, groupId, admin, cb, alterOffsetsResult); + } else { + alterSinkConnectorOffsets(connName, groupId, admin, offsetsToWrite, cb, alterOffsetsResult); + } } catch (Throwable t) { - Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + Utils.closeQuietly(admin, "Offset modification admin for sink connector " + connName); throw t; } } catch (Throwable t) { - cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to alter offsets for sink connector " + connName), null); + cb.onCompletion(ConnectUtils.maybeWrap(t, "Failed to modify offsets for sink connector " + connName), null); } })); } /** - * Alter a source connector's offsets. + * Alter a sink connector's consumer group offsets. This is done via calls to {@link Admin#alterConsumerGroupOffsets} + * and / or {@link Admin#deleteConsumerGroupOffsets}. * - * @param connName the name of the source connector whose offsets are to be altered + * @param connName the name of the sink connector whose offsets are to be altered + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for altering the consumer group offsets; should be closed after use + * @param offsetsToWrite a mapping from topic partitions to offsets that need to be written; may not be null or empty + * @param cb callback to invoke upon completion + * @param alterOffsetsResult the result of the call to {@link SinkConnector#alterOffsets} for the connector + */ + private void alterSinkConnectorOffsets(String connName, String groupId, Admin admin, Map<TopicPartition, Long> offsetsToWrite, + Callback<Message> cb, boolean alterOffsetsResult) { + List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + + if (!offsetsToAlter.isEmpty()) { + log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, + alterConsumerGroupOffsetsOptions); + + adminFutures.add(alterConsumerGroupOffsetsResult.all()); + } + + Set<TopicPartition> partitionsToReset = offsetsToWrite.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!partitionsToReset.isEmpty()) { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( + (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, + deleteConsumerGroupOffsetsOptions); + + adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + } + + @SuppressWarnings("rawtypes") + KafkaFuture<Void> compositeAdminFuture = KafkaFuture.allOf(adminFutures.toArray(new KafkaFuture[0])); + + compositeAdminFuture.whenComplete((ignored, error) -> { + if (error != null) { + // When a consumer group is non-empty, only group members can commit offsets. An attempt to alter offsets via the admin client + // will result in an UnknownMemberIdException if the consumer group is non-empty (i.e. if the sink tasks haven't stopped + // completely or if the connector is resumed while the alter offsets request is being processed). Similarly, an attempt to + // delete consumer group offsets for a non-empty consumer group will result in a GroupSubscribedToTopicException + if (error instanceof UnknownMemberIdException || error instanceof GroupSubscribedToTopicException) { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName + " either because its tasks " + + "haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully " + + "completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the " + + "Connect cluster may need to be restarted to get rid of the zombie sink tasks."), + null); + } else { + cb.onCompletion(new ConnectException("Failed to alter consumer group offsets for connector " + connName, error), null); + } + } else { + completeAlterOffsetsCallback(alterOffsetsResult, false, cb); + } + }).whenComplete((ignored, ignoredError) -> { + // errors originating from the original future are handled in the prior whenComplete invocation which isn't expected to throw + // an exception itself, and we can thus ignore the error here + Utils.closeQuietly(admin, "Offset alter admin for sink connector " + connName); + }); + } + + /** + * Reset a sink connector's consumer group offsets. This is done by deleting the consumer group via a call to + * {@link Admin#deleteConsumerGroups} + * + * @param connName the name of the sink connector whose offsets are to be reset + * @param groupId the sink connector's consumer group ID + * @param admin the {@link Admin admin client} to be used for resetting the consumer group offsets; should be closed after use Review Comment: ```suggestion * @param admin the {@link Admin admin client} to be used for resetting the consumer group offsets; will be closed after use ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -574,6 +574,199 @@ public void alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect, "Source connector offsets should reflect the expected number of records produced"); } + @Test + public void testResetSinkConnectorOffsets() throws Exception { + resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); + } + + @Test + public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + "overridden-group-id"); + resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); + // Ensure that the overridden consumer group ID was the one actually used + try (Admin admin = connect.kafka().createAdminClient()) { + Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + } + } + + @Test + public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster); + } + } + + private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { + int numPartitions = 3; + int numMessages = 10; + kafkaCluster.createTopic(TOPIC, numPartitions); + + // Produce numMessages messages to each partition + for (int partition = 0; partition < numPartitions; partition++) { + for (int message = 0; message < numMessages; message++) { + kafkaCluster.produce(TOPIC, partition, "key", "value"); + } + } + // Create sink connector + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, numMessages, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Reset the sink connector's offsets + String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error + response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Resume the connector and expect its offsets to catch up to the latest offsets + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector tasks did not resume in time" + ); + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, "test-topic", numPartitions, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + } + + @Test + public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { + connect.kafka().createTopic(TOPIC, 1); + + // Produce 10 messages + for (int message = 0; message < 10; message++) { + connect.kafka().produce(TOPIC, 0, "key", "value"); + } + + // Configure a sink connector whose sink task blocks in its stop method + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put("block", "Task::stop"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + + // Try to reset the offsets + ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); + assertThat(e.getMessage(), containsString("zombie sink task")); + } + + @Test + public void testResetSourceConnectorOffsets() throws Exception { + resetAndVerifySourceConnectorOffsets(connect, baseSourceConnectorConfigs()); + } + + // Note that the following test also implicitly tests the custom offsets topic case since source connectors always use + // a separate offsets topic when exactly once support is enabled and the Kafka cluster targeted by the source connector + // is different from the Connect cluster's backing Kafka cluster. + @Test + public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabledAndDifferentKafkaClusterTargeted() throws Exception { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + EmbeddedKafkaCluster connectorTargetedKafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + + // This embedded Connect cluster will internally spin up its own embedded Kafka cluster + EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new EmbeddedConnectCluster.Builder() + .name("connect-cluster") + .brokerProps(brokerProps) + .numWorkers(NUM_WORKERS) + .workerProps(workerProps) + .build(); + Review Comment: Nit: extra newline ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org