yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1159694956


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1247,6 +1259,217 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
         });
     }
 
+    /**
+     * Alter a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be 
altered
+     * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+     * @param connectorConfig the connector's configurations
+     *
+     * @return true if the connector plugin has implemented {@link 
org.apache.kafka.connect.sink.SinkConnector#alterOffsets(Map, Map)}
+     * / {@link 
org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map)} and it 
returns true for the provided offsets,
+     * false otherwise
+     *
+     */
+    public boolean alterConnectorOffsets(String connName, Map<Map<String, ?>, 
Map<String, ?>> offsets,
+                                         Map<String, String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+            if (ConnectUtils.isSinkConnector(connector)) {
+                log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+                return alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets);
+            } else {
+                log.debug("Altering offsets for source connector: {}", 
connName);
+                return alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets);
+            }
+        }
+    }
+
+    /**
+     * Alter a sink connector's consumer group offsets.
+     * @param connName the name of the sink connector whose offsets are to be 
altered
+     * @param connector an instance of the sink connector
+     * @param connectorConfig the sink connector's configuration
+     * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+     * @return true if the sink connector has implemented {@link 
org.apache.kafka.connect.sink.SinkConnector#alterOffsets(Map, Map)}
+     * and it returns true for the provided offsets, false otherwise
+     */
+    private boolean alterSinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                              Map<Map<String, ?>, Map<String, 
?>> offsets) {
+        return alterSinkConnectorOffsets(connName, connector, connectorConfig, 
offsets, Admin::create);
+    }
+
+    // Visible for testing; allows mocking the admin client for testing
+    boolean alterSinkConnectorOffsets(String connName, Connector connector, 
Map<String, String> connectorConfig,
+                                      Map<Map<String, ?>, Map<String, ?>> 
offsets, Function<Map<String, Object>, Admin> adminFactory) {
+
+        Map<TopicPartition, Long> parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+        Timer timer = time.timer(ALTER_OFFSETS_TIMEOUT_MS);
+        boolean alterOffsetsResult;
+        try {
+            alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+        } catch (UnsupportedOperationException e) {
+            throw new ConnectException("Failed to alter offsets for connector 
" + connName + " because it doesn't support external " +
+                    "modification of offsets", e);
+        }
+        timer.update();
+
+        Class<? extends Connector> sinkConnectorClass = connector.getClass();
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                sinkConnectorClass,
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SINK);
+
+        try (Admin admin = adminFactory.apply(adminConfig)) {
+            SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+            String groupId = (String) baseConsumerConfigs(
+                    connName, "connector-consumer-", config, 
sinkConnectorConfig,
+                    sinkConnectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+            Map<TopicPartition, OffsetAndMetadata> offsetsToAlter = 
parsedOffsets.entrySet()
+                    .stream()
+                    .filter(entry -> entry.getValue() != null)
+                    .map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue())))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+            if (!offsetsToAlter.isEmpty()) {
+                AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter);
+                try {
+                    log.debug("Committing the following consumer group topic 
partition offsets using an admin client for sink connector {}: {}.",
+                            connName, offsetsToAlter);
+                    
alterConsumerGroupOffsetsResult.all().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
+                } catch (ExecutionException e) {
+                    // TODO: Handle different exception cause types to surface 
more fine-grained errors here?
+                    throw new ConnectException("Failed to alter consumer group 
offsets for topic partitions " + offsetsToAlter.keySet() + " for connector "
+                            + connName, e.getCause());
+                } catch (TimeoutException e) {
+                    throw new ConnectException("Timed out while attempting to 
alter consumer group offsets for topic partitions " + offsetsToAlter.keySet()
+                            + " for connector" + connName, e);
+                } catch (InterruptedException e) {
+                    throw new ConnectException("Unexpectedly interrupted while 
attempting to alter consumer group offsets for topic partitions " + 
offsetsToAlter.keySet()
+                            + " for connector" + connName, e);
+                }
+                timer.update();
+            }
+
+            Set<TopicPartition> partitionsToReset = parsedOffsets.entrySet()
+                    .stream()
+                    .filter(entry -> entry.getValue() == null)
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toSet());
+
+            if (!partitionsToReset.isEmpty()) {
+                DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset);
+                try {
+                    log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
+                            connName, partitionsToReset);
+                    
deleteConsumerGroupOffsetsResult.all().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
+                } catch (ExecutionException e) {
+                    // TODO: Handle different exception cause types to surface 
more fine-grained errors here?
+                    throw new ConnectException("Failed to delete consumer 
group offsets for topic partitions " + partitionsToReset + " for connector "
+                            + connName, e.getCause());
+                } catch (TimeoutException e) {
+                    throw new ConnectException("Timed out while attempting to 
delete consumer group offsets for topic partitions " + partitionsToReset
+                            + " for connector" + connName, e);
+                } catch (InterruptedException e) {
+                    throw new ConnectException("Unexpectedly interrupted while 
attempting to delete consumer group offsets for topic partitions " + 
partitionsToReset
+                            + " for connector" + connName, e);
+                }
+            }
+            return alterOffsetsResult;
+        }
+    }
+
+    /**
+     * Alter a source connector's offsets.
+     * @param connName the name of the source connector whose offsets are to 
be altered
+     * @param connector an instance of the source connector
+     * @param connectorConfig the source connector's configuration
+     * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+     * @return true if the source connector has implemented {@link 
org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map)}
+     * and it returns true for the provided offsets, false otherwise
+     */
+    private boolean alterSourceConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                                Map<Map<String, ?>, 
Map<String, ?>> offsets) {
+        SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connectorConfig, config.topicCreationEnable());
+        Map<String, Object> producerProps = config.exactlyOnceSourceEnabled()
+                ? exactlyOnceSourceTaskProducerConfigs(new 
ConnectorTaskId(connName, 0), config, sourceConfig,

Review Comment:
   It would be cleaner to use a connector specific producer transactional ID 
here but that would require additional ACLs over what is described in 
[KIP-618](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-Producer)
 (unless a wildcard prefix is used). @C0urante would it be possible to amend 
[KIP-875](https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect)
 and document this additional required ACL for altering / resetting offsets for 
a source connector when exactly once support is enabled on the worker?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to