yashmayya commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1159694956
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1247,6 +1259,217 @@ void sourceConnectorOffsets(String connName, ConnectorOffsetBackingStore offsetS }); } + /** + * Alter a connector's offsets. + * + * @param connName the name of the connector whose offsets are to be altered + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @param connectorConfig the connector's configurations + * + * @return true if the connector plugin has implemented {@link org.apache.kafka.connect.sink.SinkConnector#alterOffsets(Map, Map)} + * / {@link org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map)} and it returns true for the provided offsets, + * false otherwise + * + */ + public boolean alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, + Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Altering consumer group offsets for sink connector: {}", connName); + return alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets); + } else { + log.debug("Altering offsets for source connector: {}", connName); + return alterSourceConnectorOffsets(connName, connector, connectorConfig, offsets); + } + } + } + + /** + * Alter a sink connector's consumer group offsets. + * @param connName the name of the sink connector whose offsets are to be altered + * @param connector an instance of the sink connector + * @param connectorConfig the sink connector's configuration + * @param offsets a mapping from topic partitions to offsets that need to be overwritten + * @return true if the sink connector has implemented {@link org.apache.kafka.connect.sink.SinkConnector#alterOffsets(Map, Map)} + * and it returns true for the provided offsets, false otherwise + */ + private boolean alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets) { + return alterSinkConnectorOffsets(connName, connector, connectorConfig, offsets, Admin::create); + } + + // Visible for testing; allows mocking the admin client for testing + boolean alterSinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets, Function<Map<String, Object>, Admin> adminFactory) { + + Map<TopicPartition, Long> parsedOffsets = SinkUtils.validateAndParseSinkConnectorOffsets(offsets); + Timer timer = time.timer(ALTER_OFFSETS_TIMEOUT_MS); + boolean alterOffsetsResult; + try { + alterOffsetsResult = ((SinkConnector) connector).alterOffsets(connectorConfig, parsedOffsets); + } catch (UnsupportedOperationException e) { + throw new ConnectException("Failed to alter offsets for connector " + connName + " because it doesn't support external " + + "modification of offsets", e); + } + timer.update(); + + Class<? extends Connector> sinkConnectorClass = connector.getClass(); + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + sinkConnectorClass, + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SINK); + + try (Admin admin = adminFactory.apply(adminConfig)) { + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, sinkConnectorConfig, + sinkConnectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + + Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (!offsetsToAlter.isEmpty()) { + AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, offsetsToAlter); + try { + log.debug("Committing the following consumer group topic partition offsets using an admin client for sink connector {}: {}.", + connName, offsetsToAlter); + alterConsumerGroupOffsetsResult.all().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + // TODO: Handle different exception cause types to surface more fine-grained errors here? + throw new ConnectException("Failed to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + " for connector " + + connName, e.getCause()); + } catch (TimeoutException e) { + throw new ConnectException("Timed out while attempting to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + + " for connector" + connName, e); + } catch (InterruptedException e) { + throw new ConnectException("Unexpectedly interrupted while attempting to alter consumer group offsets for topic partitions " + offsetsToAlter.keySet() + + " for connector" + connName, e); + } + timer.update(); + } + + Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet() + .stream() + .filter(entry -> entry.getValue() == null) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + if (!partitionsToReset.isEmpty()) { + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, partitionsToReset); + try { + log.debug("Deleting the consumer group offsets for the following topic partitions using an admin client for sink connector {}: {}.", + connName, partitionsToReset); + deleteConsumerGroupOffsetsResult.all().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + // TODO: Handle different exception cause types to surface more fine-grained errors here? + throw new ConnectException("Failed to delete consumer group offsets for topic partitions " + partitionsToReset + " for connector " + + connName, e.getCause()); + } catch (TimeoutException e) { + throw new ConnectException("Timed out while attempting to delete consumer group offsets for topic partitions " + partitionsToReset + + " for connector" + connName, e); + } catch (InterruptedException e) { + throw new ConnectException("Unexpectedly interrupted while attempting to delete consumer group offsets for topic partitions " + partitionsToReset + + " for connector" + connName, e); + } + } + return alterOffsetsResult; + } + } + + /** + * Alter a source connector's offsets. + * @param connName the name of the source connector whose offsets are to be altered + * @param connector an instance of the source connector + * @param connectorConfig the source connector's configuration + * @param offsets a mapping from partitions to offsets that need to be overwritten + * @return true if the source connector has implemented {@link org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map)} + * and it returns true for the provided offsets, false otherwise + */ + private boolean alterSourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Map<Map<String, ?>, Map<String, ?>> offsets) { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable()); + Map<String, Object> producerProps = config.exactlyOnceSourceEnabled() + ? exactlyOnceSourceTaskProducerConfigs(new ConnectorTaskId(connName, 0), config, sourceConfig, Review Comment: It would be cleaner to use a connector specific producer transactional ID here but that would require additional ACLs over what is described in [KIP-618](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-Producer) (unless a wildcard prefix is used). @C0urante would it be possible to amend [KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect) and document this additional required ACL for altering / resetting offsets for a source connector when exactly once support is enabled on the worker? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org