yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1238244318
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions() + .timeoutMs((int) timer.remainingMs()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(timer.remainingMs(), TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + timer.update(); + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting offsets for sink connector {}", connName, e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting offsets for sink connector " + connName, e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); - - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", + connName, e); + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); } + updateTimerAndCheckExpiry(timer, "Timed out while calling the 'alterOffsets' method for sink connector " + connName); Review Comment: Hm good point, we really don't want to get into a situation where let's say a call to `SinkConnector::alterOffsets` succeeds but the call to delete the consumer group fails and then any subsequent external requests to delete the connector's offsets fails because the `SinkConnector::alterOffsets` method bails out.. On the other hand, I'm also concerned about making the Javadoc for those methods overly complicated. Furthermore, since we'll be calling `alterOffsets` on separate `Connector` instances, it's not like they can track state between multiple calls. Any suggestions on the wording so that we can maintain the right balance? Maybe we could add a hint that offsets that appear non-existent should be handled gracefully if possible (this also covers the case where an alter offsets request is used to skip over some data for sink connectors)? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -574,6 +574,274 @@ public void alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect, "Source connector offsets should reflect the expected number of records produced"); } + @Test + public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws Exception { + // Create a source connector and stop it + connect.configureConnector(CONNECTOR_NAME, baseSourceConnectorConfigs()); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + String url = connect.endpointForResource(String.format("connectors/%s/offsets", CONNECTOR_NAME)); + + String content = "[]"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + + content = "{}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(400, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request")); + } + + content = "{\"key\": []}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Unrecognized field")); + } + + content = "{\"offsets\": []}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(400, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Partitions / offsets need to be provided for an alter offsets request")); + } + + content = "{\"offsets\": {}}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + + content = "{\"offsets\": [123]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot construct instance")); + } + + content = "{\"offsets\": [{\"key\": \"val\"}]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Unrecognized field")); + } + + content = "{\"offsets\": [{\"partition\": []]}]}"; + try (Response response = connect.requestPatch(url, content)) { + assertEquals(500, response.getStatus()); + assertThat(response.getEntity().toString(), containsString("Cannot deserialize value")); + } + } + + @Test + public void testResetSinkConnectorOffsets() throws Exception { + resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); + } + + @Test + public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, + "overridden-group-id"); + resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka()); + // Ensure that the overridden consumer group ID was the one actually used + try (Admin admin = connect.kafka().createAdminClient()) { + Collection<ConsumerGroupListing> consumerGroups = admin.listConsumerGroups().all().get(); + assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing -> "overridden-group-id".equals(consumerGroupListing.groupId()))); + assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing -> SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId()))); + } + } + + @Test + public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster); + } + } + + private void resetAndVerifySinkConnectorOffsets(Map<String, String> connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception { + int numPartitions = 3; + int numMessages = 10; + kafkaCluster.createTopic(TOPIC, numPartitions); + + // Produce numMessages messages to each partition + for (int partition = 0; partition < numPartitions; partition++) { + for (int message = 0; message < numMessages; message++) { + kafkaCluster.produce(TOPIC, partition, "key", "value"); + } + } + // Create sink connector + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, numMessages, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Reset the sink connector's offsets + String response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Reset the sink connector's offsets again while it is still in a STOPPED state and ensure that there is no error + response = connect.resetConnectorOffsets(CONNECTOR_NAME); + assertThat(response, containsString("The Connect framework-managed offsets for this connector have been reset successfully. " + + "However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses.")); + + waitForEmptySinkConnectorOffsets(CONNECTOR_NAME); + + // Resume the connector and expect its offsets to catch up to the latest offsets + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector tasks did not resume in time" + ); + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, numPartitions, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + } + + @Test + public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { + connect.kafka().createTopic(TOPIC, 1); + + // Produce 10 messages + for (int message = 0; message < 10; message++) { + connect.kafka().produce(TOPIC, 0, "key", "value"); + } + + // Configure a sink connector whose sink task blocks in its stop method + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, TOPIC); + connectorConfigs.put("block", "Task::stop"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector tasks did not start in time."); + + waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10, + "Sink connector consumer group offsets should catch up to the topic end offsets"); + + connect.stopConnector(CONNECTOR_NAME); + + // Try to reset the offsets + ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME)); + assertThat(e.getMessage(), containsString("zombie sink task")); + } + + @Test + public void testResetSourceConnectorOffsets() throws Exception { + resetAndVerifySourceConnectorOffsets(connect, baseSourceConnectorConfigs()); + } + + @Test + public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws Exception { + Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); + connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG, "custom-offsets-topic"); + resetAndVerifySourceConnectorOffsets(connect, connectorConfigs); + } + + @Test + public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted() throws Exception { + EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new Properties()); + + try (AutoCloseable ignored = kafkaCluster::stop) { + kafkaCluster.start(); + + Map<String, String> connectorConfigs = baseSourceConnectorConfigs(); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + kafkaCluster.bootstrapServers()); + + resetAndVerifySourceConnectorOffsets(connect, connectorConfigs); + } + } + + @Test + public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled() throws Exception { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + + // This embedded Connect cluster will internally spin up its own embedded Kafka cluster + EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new EmbeddedConnectCluster.Builder() Review Comment: Hah, I was considering doing the same thing earlier but discarded it thinking that it'd add more noise for review (since we'd touch existing test cases too). Thanks for the push in the right direction (I agree that adding a couple of lines of code to tests here and there is worth it for reducing test runtime bloat); I've made this change 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, Admin admin = adminFactory.apply(adminConfig); try { - List<KafkaFuture<Void>> adminFutures = new ArrayList<>(); - - Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - - if (!offsetsToAlter.isEmpty()) { - log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", - connName, offsetsToAlter); - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( + Map<TopicPartition, Long> offsetsToWrite; + if (isReset) { + offsetsToWrite = new HashMap<>(); + ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, - alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); + try { + admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) + .partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + + log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", + connName, groupId, offsetsToWrite.keySet()); + } catch (Exception e) { + Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); + log.error("Failed to list offsets prior to resetting sink connector offsets", e); + cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); + return; + } + } else { + offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } - Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() - .stream() - .filter(entry -> entry.getValue() == null) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - - if (!partitionsToReset.isEmpty()) { - log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", - connName, partitionsToReset); - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( - (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); - DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, - deleteConsumerGroupOffsetsOptions); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); + // This should only occur for an offset reset request when: + // 1. There was a prior attempt to reset offsets + // OR + // 2. No offsets have been committed yet + if (offsetsToWrite.isEmpty()) { Review Comment: Yeah, that makes sense. I've filed this follow-up Jira ticket so that we can re-visit if necessary - https://issues.apache.org/jira/browse/KAFKA-15113 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org