yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1167415436


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param connectorConfig the connector's configurations
+     * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+     * @param cb callback to invoke upon completion
+     */
+    public void alterConnectorOffsets(String connName, Map<String, String> 
connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Callback<Message> cb) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * <p>
+     * Visible for testing.
+     *
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+     * @param cb callback to invoke upon completion
+     */
+    void alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                   Map<Map<String, ?>, Map<String, ?>> 
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
+        executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+            try {
+                Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+                boolean alterOffsetsResult;
+                try {
+                    alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+                } catch (UnsupportedOperationException e) {
+                    throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+                            "modification of offsets", e);
+                }
+
+                Class<? extends Connector> sinkConnectorClass = 
connector.getClass();
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        new SinkConnectorConfig(plugins, connectorConfig),
+                        sinkConnectorClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SINK);
+
+                SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+                String groupId = (String) baseConsumerConfigs(
+                        connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                        sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+                Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() != null)
+                        .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+                Set<TopicPartition> partitionsToReset = 
parsedOffsets.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getValue() == null)
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toSet());
+
+                KafkaFuture<Void> adminFuture = 
KafkaFuture.completedFuture(null);
+
+                Admin admin = adminFactory.apply(adminConfig);
+
+                try {

Review Comment:
   While this whole bit does seem a bit convoluted, it offers the advantage of 
not trying to delete partition offsets when the alter offsets attempt fails. 
The alternative could be to simply do both the operations concurrently 
(`alterConsumerGroupOffsets` and `deleteConsumerGroupOffsets`), and then react 
to the combined response (combining the returned futures using 
`KafkaFuture::allOf`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to