yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1147332613


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }
+
+        if (ConnectUtils.isSinkConnector(connector)) {
+            log.debug("Fetching offsets for sink connector: {}", connName);
+            return sinkConnectorOffsets(connName, connector, connectorConfig);
+        } else {
+            log.debug("Fetching offsets for source connector: {}", connName);
+            return sourceConnectorOffsets(connName, connector, 
connectorConfig);
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @return the consumer group offsets for the sink connector
+     */
+    private ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig) {
+        return sinkConnectorOffsets(connName, connector, connectorConfig, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                          Function<Map<String, Object>, Admin> 
adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId);

Review Comment:
   Thanks, that sounds good. I'd prefer to keep this as a potential follow-up 
item in case we hear about user complaints regarding this (although it'd 
probably be much easier for users to just configure their connector's admin 
overrides correctly). I'm not a very big fan of using the consumer overrides 
for the admin client especially because we likely wouldn't want to do the same 
for the DLQ admin client since it wouldn't be strictly backward compatible and 
the divergence doesn't seem very appealing.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java:
##########
@@ -65,6 +66,13 @@ public interface OffsetBackingStore {
      */
     Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> 
callback);
 
+    /**
+     * Get all the partitions for the specified connector.
+     * @param connectorName the name of the connector whose partitions are to 
be retrieved
+     * @return set of connector partitions
+     */
+    Set<Map<String, Object>> connectorPartitions(String connectorName);

Review Comment:
   I'm glad to hear that we're in agreement here and I've filed this follow-up 
ticket to make the necessary changes - 
https://issues.apache.org/jira/browse/KAFKA-14844



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to