yashmayya commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1167415436
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } + /** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param connectorConfig the connector's configurations + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param cb callback to invoke upon completion + */ + public void alterConnectorOffsets(String connName, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Altering consumer group offsets for sink connector: {}", connName); + alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } else { + log.debug("Altering offsets for source connector: {}", connName); + alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); + } + } + } + + /** + * Alter a sink connector's consumer group offsets. + * <p> + * Visible for testing. + * + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @param connectorLoader the connector plugin's classloader to be used as the thread context classloader + * @param cb callback to invoke upon completion + */ + void alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) { + executor.submit(plugins.withClassLoader(connectorLoader, () -> { + try { + Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + + Class<? extends Connector> sinkConnectorClass = connector.getClass(); + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + sinkConnectorClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SINK); + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, sinkConnectorConfig, + sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + KafkaFuture<Void> adminFuture = KafkaFuture.completedFuture(null); + + Admin admin = adminFactory.apply(adminConfig); + + try { Review Comment: While this whole bit does seem a bit convoluted, it offers the advantage of not trying to delete partition offsets when the alter offsets attempt fails. The alternative could be to simply do both the operations concurrently (`alterConsumerGroupOffsets` and `deleteConsumerGroupOffsets`), and then react to the combined response (combining the returned futures using `KafkaFuture::allOf`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org