yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238244318
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName,
Connector connector, Map<String,
Admin admin = adminFactory.apply(adminConfig);
try {
- List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
- Map<TopicPartition, OffsetAndMetadata> offsetsToAlter =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
new OffsetAndMetadata(e.getValue())));
-
- if (!offsetsToAlter.isEmpty()) {
- log.debug("Committing the following consumer group
offsets using an admin client for sink connector {}: {}.",
- connName, offsetsToAlter);
- AlterConsumerGroupOffsetsOptions
alterConsumerGroupOffsetsOptions = new
AlterConsumerGroupOffsetsOptions().timeoutMs(
- (int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- AlterConsumerGroupOffsetsResult
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId,
offsetsToAlter,
- alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+ Map<TopicPartition, Long> offsetsToWrite;
+ if (isReset) {
+ offsetsToWrite = new HashMap<>();
+ ListConsumerGroupOffsetsOptions
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+ .timeoutMs((int) timer.remainingMs());
+ try {
+ admin.listConsumerGroupOffsets(groupId,
listConsumerGroupOffsetsOptions)
+ .partitionsToOffsetAndMetadata()
+ .get(timer.remainingMs(),
TimeUnit.MILLISECONDS)
+ .forEach((topicPartition,
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+ timer.update();
+ log.debug("Found the following topic partitions
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+ connName, groupId,
offsetsToWrite.keySet());
+ } catch (Exception e) {
+ Utils.closeQuietly(admin, "Offset reset admin for
sink connector " + connName);
+ log.error("Failed to list offsets prior to
resetting offsets for sink connector {}", connName, e);
+ cb.onCompletion(new ConnectException("Failed to
list offsets prior to resetting offsets for sink connector " + connName, e),
null);
+ return;
+ }
+ } else {
+ offsetsToWrite =
SinkUtils.parseSinkConnectorOffsets(offsets);
}
- Set<TopicPartition> partitionsToReset =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() == null)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- if (!partitionsToReset.isEmpty()) {
- log.debug("Deleting the consumer group offsets for the
following topic partitions using an admin client for sink connector {}: {}.",
- connName, partitionsToReset);
- DeleteConsumerGroupOffsetsOptions
deleteConsumerGroupOffsetsOptions = new
DeleteConsumerGroupOffsetsOptions().timeoutMs(
- (int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- DeleteConsumerGroupOffsetsResult
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId,
partitionsToReset,
- deleteConsumerGroupOffsetsOptions);
-
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+ boolean alterOffsetsResult;
+ try {
+ alterOffsetsResult = ((SinkConnector)
connector).alterOffsets(connectorConfig, offsetsToWrite);
+ } catch (UnsupportedOperationException e) {
+ log.error("Failed to modify offsets for connector {}
because it doesn't support external modification of offsets",
+ connName, e);
+ throw new ConnectException("Failed to modify offsets
for connector " + connName + " because it doesn't support external " +
+ "modification of offsets", e);
}
+ updateTimerAndCheckExpiry(timer, "Timed out while calling
the 'alterOffsets' method for sink connector " + connName);
Review Comment:
Hm good point, we really don't want to get into a situation where let's say
a call to `SinkConnector::alterOffsets` succeeds but the call to delete the
consumer group fails and then any subsequent external requests to delete the
connector's offsets fails because the `SinkConnector::alterOffsets` method
bails out..
On the other hand, I'm also concerned about making the Javadoc for those
methods overly complicated. Furthermore, since we'll be calling `alterOffsets`
on separate `Connector` instances, it's not like they can track state between
multiple calls. Any suggestions on the wording so that we can maintain the
right balance? Maybe we could add a hint that offsets that appear non-existent
should be handled gracefully if possible (this also covers the case where an
alter offsets request is used to skip over some data for sink connectors)?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -574,6 +574,274 @@ public void
alterAndVerifySourceConnectorOffsets(EmbeddedConnectCluster connect,
"Source connector offsets should reflect the expected number
of records produced");
}
+ @Test
+ public void testAlterSourceConnectorOffsetsInvalidRequestBody() throws
Exception {
+ // Create a source connector and stop it
+ connect.configureConnector(CONNECTOR_NAME,
baseSourceConnectorConfigs());
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
NUM_TASKS,
+ "Connector tasks did not start in time.");
+ connect.stopConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector did not stop in time"
+ );
+ String url =
connect.endpointForResource(String.format("connectors/%s/offsets",
CONNECTOR_NAME));
+
+ String content = "[]";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(500, response.getStatus());
+ assertThat(response.getEntity().toString(), containsString("Cannot
deserialize value"));
+ }
+
+ content = "{}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(400, response.getStatus());
+ assertThat(response.getEntity().toString(),
containsString("Partitions / offsets need to be provided for an alter offsets
request"));
+ }
+
+ content = "{\"key\": []}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(500, response.getStatus());
+ assertThat(response.getEntity().toString(),
containsString("Unrecognized field"));
+ }
+
+ content = "{\"offsets\": []}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(400, response.getStatus());
+ assertThat(response.getEntity().toString(),
containsString("Partitions / offsets need to be provided for an alter offsets
request"));
+ }
+
+ content = "{\"offsets\": {}}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(500, response.getStatus());
+ assertThat(response.getEntity().toString(), containsString("Cannot
deserialize value"));
+ }
+
+ content = "{\"offsets\": [123]}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(500, response.getStatus());
+ assertThat(response.getEntity().toString(), containsString("Cannot
construct instance"));
+ }
+
+ content = "{\"offsets\": [{\"key\": \"val\"}]}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(500, response.getStatus());
+ assertThat(response.getEntity().toString(),
containsString("Unrecognized field"));
+ }
+
+ content = "{\"offsets\": [{\"partition\": []]}]}";
+ try (Response response = connect.requestPatch(url, content)) {
+ assertEquals(500, response.getStatus());
+ assertThat(response.getEntity().toString(), containsString("Cannot
deserialize value"));
+ }
+ }
+
+ @Test
+ public void testResetSinkConnectorOffsets() throws Exception {
+ resetAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(),
connect.kafka());
+ }
+
+ @Test
+ public void testResetSinkConnectorOffsetsOverriddenConsumerGroupId()
throws Exception {
+ Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX
+ CommonClientConfigs.GROUP_ID_CONFIG,
+ "overridden-group-id");
+ resetAndVerifySinkConnectorOffsets(connectorConfigs, connect.kafka());
+ // Ensure that the overridden consumer group ID was the one actually
used
+ try (Admin admin = connect.kafka().createAdminClient()) {
+ Collection<ConsumerGroupListing> consumerGroups =
admin.listConsumerGroups().all().get();
+ assertTrue(consumerGroups.stream().anyMatch(consumerGroupListing
-> "overridden-group-id".equals(consumerGroupListing.groupId())));
+ assertTrue(consumerGroups.stream().noneMatch(consumerGroupListing
->
SinkUtils.consumerGroupId(CONNECTOR_NAME).equals(consumerGroupListing.groupId())));
+ }
+ }
+
+ @Test
+ public void testResetSinkConnectorOffsetsDifferentKafkaClusterTargeted()
throws Exception {
+ EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new
Properties());
+
+ try (AutoCloseable ignored = kafkaCluster::stop) {
+ kafkaCluster.start();
+
+ Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
+
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ kafkaCluster.bootstrapServers());
+
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX +
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ kafkaCluster.bootstrapServers());
+
+ resetAndVerifySinkConnectorOffsets(connectorConfigs, kafkaCluster);
+ }
+ }
+
+ private void resetAndVerifySinkConnectorOffsets(Map<String, String>
connectorConfigs, EmbeddedKafkaCluster kafkaCluster) throws Exception {
+ int numPartitions = 3;
+ int numMessages = 10;
+ kafkaCluster.createTopic(TOPIC, numPartitions);
+
+ // Produce numMessages messages to each partition
+ for (int partition = 0; partition < numPartitions; partition++) {
+ for (int message = 0; message < numMessages; message++) {
+ kafkaCluster.produce(TOPIC, partition, "key", "value");
+ }
+ }
+ // Create sink connector
+ connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
NUM_TASKS,
+ "Connector tasks did not start in time.");
+
+ waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC,
numPartitions, numMessages,
+ "Sink connector consumer group offsets should catch up to the
topic end offsets");
+
+ connect.stopConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector did not stop in time"
+ );
+
+ // Reset the sink connector's offsets
+ String response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+ assertThat(response, containsString("The Connect framework-managed
offsets for this connector have been reset successfully. " +
+ "However, if this connector manages offsets externally, they
will need to be manually reset in the system that the connector uses."));
+
+ waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+ // Reset the sink connector's offsets again while it is still in a
STOPPED state and ensure that there is no error
+ response = connect.resetConnectorOffsets(CONNECTOR_NAME);
+ assertThat(response, containsString("The Connect framework-managed
offsets for this connector have been reset successfully. " +
+ "However, if this connector manages offsets externally, they
will need to be manually reset in the system that the connector uses."));
+
+ waitForEmptySinkConnectorOffsets(CONNECTOR_NAME);
+
+ // Resume the connector and expect its offsets to catch up to the
latest offsets
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ NUM_TASKS,
+ "Connector tasks did not resume in time"
+ );
+ waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC,
numPartitions, 10,
+ "Sink connector consumer group offsets should catch up to the
topic end offsets");
+ }
+
+ @Test
+ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws
Exception {
+ connect.kafka().createTopic(TOPIC, 1);
+
+ // Produce 10 messages
+ for (int message = 0; message < 10; message++) {
+ connect.kafka().produce(TOPIC, 0, "key", "value");
+ }
+
+ // Configure a sink connector whose sink task blocks in its stop method
+ Map<String, String> connectorConfigs = new HashMap<>();
+ connectorConfigs.put(CONNECTOR_CLASS_CONFIG,
BlockingConnectorTest.BlockingSinkConnector.class.getName());
+ connectorConfigs.put(TOPICS_CONFIG, TOPIC);
+ connectorConfigs.put("block", "Task::stop");
+
+ connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
1,
+ "Connector tasks did not start in time.");
+
+ waitForExpectedSinkConnectorOffsets(CONNECTOR_NAME, TOPIC, 1, 10,
+ "Sink connector consumer group offsets should catch up to the
topic end offsets");
+
+ connect.stopConnector(CONNECTOR_NAME);
+
+ // Try to reset the offsets
+ ConnectRestException e = assertThrows(ConnectRestException.class, ()
-> connect.resetConnectorOffsets(CONNECTOR_NAME));
+ assertThat(e.getMessage(), containsString("zombie sink task"));
+ }
+
+ @Test
+ public void testResetSourceConnectorOffsets() throws Exception {
+ resetAndVerifySourceConnectorOffsets(connect,
baseSourceConnectorConfigs());
+ }
+
+ @Test
+ public void testResetSourceConnectorOffsetsCustomOffsetsTopic() throws
Exception {
+ Map<String, String> connectorConfigs = baseSourceConnectorConfigs();
+ connectorConfigs.put(SourceConnectorConfig.OFFSETS_TOPIC_CONFIG,
"custom-offsets-topic");
+ resetAndVerifySourceConnectorOffsets(connect, connectorConfigs);
+ }
+
+ @Test
+ public void testResetSourceConnectorOffsetsDifferentKafkaClusterTargeted()
throws Exception {
+ EmbeddedKafkaCluster kafkaCluster = new EmbeddedKafkaCluster(1, new
Properties());
+
+ try (AutoCloseable ignored = kafkaCluster::stop) {
+ kafkaCluster.start();
+
+ Map<String, String> connectorConfigs =
baseSourceConnectorConfigs();
+
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ kafkaCluster.bootstrapServers());
+
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX +
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ kafkaCluster.bootstrapServers());
+
+ resetAndVerifySourceConnectorOffsets(connect, connectorConfigs);
+ }
+ }
+
+ @Test
+ public void testResetSourceConnectorOffsetsExactlyOnceSupportEnabled()
throws Exception {
+ Properties brokerProps = new Properties();
+ brokerProps.put("transaction.state.log.replication.factor", "1");
+ brokerProps.put("transaction.state.log.min.isr", "1");
+ workerProps.put(DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
"enabled");
+
+ // This embedded Connect cluster will internally spin up its own
embedded Kafka cluster
+ EmbeddedConnectCluster exactlyOnceSupportEnabledConnectCluster = new
EmbeddedConnectCluster.Builder()
Review Comment:
Hah, I was considering doing the same thing earlier but discarded it
thinking that it'd add more noise for review (since we'd touch existing test
cases too). Thanks for the push in the right direction (I agree that adding a
couple of lines of code to tests here and there is worth it for reducing test
runtime bloat); I've made this change 👍
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName,
Connector connector, Map<String,
Admin admin = adminFactory.apply(adminConfig);
try {
- List<KafkaFuture<Void>> adminFutures = new ArrayList<>();
-
- Map<TopicPartition, OffsetAndMetadata> offsetsToAlter =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() != null)
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
new OffsetAndMetadata(e.getValue())));
-
- if (!offsetsToAlter.isEmpty()) {
- log.debug("Committing the following consumer group
offsets using an admin client for sink connector {}: {}.",
- connName, offsetsToAlter);
- AlterConsumerGroupOffsetsOptions
alterConsumerGroupOffsetsOptions = new
AlterConsumerGroupOffsetsOptions().timeoutMs(
+ Map<TopicPartition, Long> offsetsToWrite;
+ if (isReset) {
+ offsetsToWrite = new HashMap<>();
+ ListConsumerGroupOffsetsOptions
listConsumerGroupOffsetsOptions = new
ListConsumerGroupOffsetsOptions().timeoutMs(
(int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- AlterConsumerGroupOffsetsResult
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId,
offsetsToAlter,
- alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+ try {
+ admin.listConsumerGroupOffsets(groupId,
listConsumerGroupOffsetsOptions)
+ .partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .forEach((topicPartition,
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+ log.debug("Found the following topic partitions
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+ connName, groupId,
offsetsToWrite.keySet());
+ } catch (Exception e) {
+ Utils.closeQuietly(admin, "Offset reset admin for
sink connector " + connName);
+ log.error("Failed to list offsets prior to
resetting sink connector offsets", e);
+ cb.onCompletion(new ConnectException("Failed to
list offsets prior to resetting sink connector offsets", e), null);
+ return;
+ }
+ } else {
+ offsetsToWrite =
SinkUtils.parseSinkConnectorOffsets(offsets);
}
- Set<TopicPartition> partitionsToReset =
parsedOffsets.entrySet()
- .stream()
- .filter(entry -> entry.getValue() == null)
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
- if (!partitionsToReset.isEmpty()) {
- log.debug("Deleting the consumer group offsets for the
following topic partitions using an admin client for sink connector {}: {}.",
- connName, partitionsToReset);
- DeleteConsumerGroupOffsetsOptions
deleteConsumerGroupOffsetsOptions = new
DeleteConsumerGroupOffsetsOptions().timeoutMs(
- (int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
- DeleteConsumerGroupOffsetsResult
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId,
partitionsToReset,
- deleteConsumerGroupOffsetsOptions);
+ boolean alterOffsetsResult;
+ try {
+ alterOffsetsResult = ((SinkConnector)
connector).alterOffsets(connectorConfig, offsetsToWrite);
+ } catch (UnsupportedOperationException e) {
+ throw new ConnectException("Failed to modify offsets
for connector " + connName + " because it doesn't support external " +
+ "modification of offsets", e);
+ }
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+ // This should only occur for an offset reset request when:
+ // 1. There was a prior attempt to reset offsets
+ // OR
+ // 2. No offsets have been committed yet
+ if (offsetsToWrite.isEmpty()) {
Review Comment:
Yeah, that makes sense. I've filed this follow-up Jira ticket so that we can
re-visit if necessary - https://issues.apache.org/jira/browse/KAFKA-15113
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]