[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1238244318 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions() +.timeoutMs((int) timer.remainingMs()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() +.get(timer.remainingMs(), TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +timer.update(); +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting offsets for sink connector {}", connName, e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting offsets for sink connector " + connName, e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -if (!partitionsToReset.isEmpty()) { -log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", -connName, partitionsToReset); -DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, -deleteConsumerGroupOffsetsOptions); - - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); +} catch (UnsupportedOperationException e) { +log.error("Failed to modify offsets for connector {} because it doesn't support external modification of offsets", +connName, e); +throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); } +
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -if (!partitionsToReset.isEmpty()) { -log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", -connName, partitionsToReset); -DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, -deleteConsumerGroupOffsetsOptions); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); +// This should only occur for an offset reset request when: +// 1. There was a prior attempt to reset offsets +// OR +// 2. No offsets have been committed yet +if (offsetsToWrite.isEmpty()) { Review Comment: I've made this change but it also
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -if (!partitionsToReset.isEmpty()) { -log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", -connName, partitionsToReset); -DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, -deleteConsumerGroupOffsetsOptions); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); +// This should only occur for an offset reset request when: +// 1. There was a prior attempt to reset offsets +// OR +// 2. No offsets have been committed yet +if (offsetsToWrite.isEmpty()) { Review Comment: I've made this change but it also
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1222869378 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, Map connector connector = plugins.newConnector(connectorClassOrAlias); if (ConnectUtils.isSinkConnector(connector)) { log.debug("Altering consumer group offsets for sink connector: {}", connName); -alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); } else { log.debug("Altering offsets for source connector: {}", connName); -alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +modifySourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); +} +} +} + +/** + * Reset a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be reset + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion + */ +public void resetConnectorOffsets(String connName, Map connectorConfig, Callback cb) { Review Comment: > If we're worried about accidentally introducing a nasty bug where an empty-bodied alter request causes an unintentional reset, we can add an integration test for that case. Yeah, this was exactly my worry and the reason why I'd kept them separated. Based on your feedback, I've added a new integration test and also moved the second level check to `AbstractHerder::alterConnectorOffsets` (so that we can consolidate the two methods in the `Worker`). While we could in theory do a similar consolidation for the `Herder` methods, I think it's probably a better idea to have cleaner and more well-defined interface methods there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1221937078 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String connName, Callback c // If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above), // we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop). // Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled -// naturally because requests to alter consumer group offsets will fail if there are still active members in the group. +// naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members +// in the group. if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) { -callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " + -"can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null); +callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " + Review Comment: I think just "modified" should be clear enough to users since this is synchronously surfaced to the user via the alter / reset offsets REST API response ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -
[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API
yashmayya commented on code in PR #13818: URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, Connector connector, Map> adminFutures = new ArrayList<>(); - -Map offsetsToAlter = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() != null) -.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue(; - -if (!offsetsToAlter.isEmpty()) { -log.debug("Committing the following consumer group offsets using an admin client for sink connector {}: {}.", -connName, offsetsToAlter); -AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions = new AlterConsumerGroupOffsetsOptions().timeoutMs( +Map offsetsToWrite; +if (isReset) { +offsetsToWrite = new HashMap<>(); +ListConsumerGroupOffsetsOptions listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs( (int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter, -alterConsumerGroupOffsetsOptions); - - adminFutures.add(alterConsumerGroupOffsetsResult.all()); +try { +admin.listConsumerGroupOffsets(groupId, listConsumerGroupOffsetsOptions) +.partitionsToOffsetAndMetadata() + .get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) +.forEach((topicPartition, offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null)); + +log.debug("Found the following topic partitions (to reset offsets) for sink connector {} and consumer group ID {}: {}", +connName, groupId, offsetsToWrite.keySet()); +} catch (Exception e) { +Utils.closeQuietly(admin, "Offset reset admin for sink connector " + connName); +log.error("Failed to list offsets prior to resetting sink connector offsets", e); +cb.onCompletion(new ConnectException("Failed to list offsets prior to resetting sink connector offsets", e), null); +return; +} +} else { +offsetsToWrite = SinkUtils.parseSinkConnectorOffsets(offsets); } -Set partitionsToReset = parsedOffsets.entrySet() -.stream() -.filter(entry -> entry.getValue() == null) -.map(Map.Entry::getKey) -.collect(Collectors.toSet()); - -if (!partitionsToReset.isEmpty()) { -log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", -connName, partitionsToReset); -DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions = new DeleteConsumerGroupOffsetsOptions().timeoutMs( -(int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); -DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset, -deleteConsumerGroupOffsetsOptions); +boolean alterOffsetsResult; +try { +alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, offsetsToWrite); +} catch (UnsupportedOperationException e) { +throw new ConnectException("Failed to modify offsets for connector " + connName + " because it doesn't support external " + +"modification of offsets", e); +} - adminFutures.add(deleteConsumerGroupOffsetsResult.all()); +// This should only occur for an offset reset request when: +// 1. There was a prior attempt to reset offsets +// OR +// 2. No offsets have been committed yet +if (offsetsToWrite.isEmpty()) { Review Comment: I'm wondering whether we should go