yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1147332613
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + } + + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + return sinkConnectorOffsets(connName, connector, connectorConfig); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + return sourceConnectorOffsets(connName, connector, connectorConfig); + } + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @return the consumer group offsets for the sink connector + */ + private ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig) { + return sinkConnectorOffsets(connName, connector, connectorConfig, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + try { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId); Review Comment: Thanks, that sounds good. I'd prefer to keep this as a potential follow-up item in case we hear about user complaints regarding this (although it'd probably be much easier for users to just configure their connector's admin overrides correctly). I'm not a very big fan of using the consumer overrides for the admin client especially because we likely wouldn't want to do the same for the DLQ admin client since it wouldn't be strictly backward compatible and the divergence doesn't seem very appealing. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java: ########## @@ -65,6 +66,13 @@ public interface OffsetBackingStore { */ Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback); + /** + * Get all the partitions for the specified connector. + * @param connectorName the name of the connector whose partitions are to be retrieved + * @return set of connector partitions + */ + Set<Map<String, Object>> connectorPartitions(String connectorName); Review Comment: I'm glad to hear that we're in agreement here and I've filed this follow-up ticket to make the necessary changes - https://issues.apache.org/jira/browse/KAFKA-14844 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org