yashmayya commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1159264066
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName,
final Map<String, String>
);
}
+ @Override
+ public void stopConnector(final String connName, final Callback<Void>
callback) {
+ log.trace("Submitting request to transition connector {} to STOPPED
state", connName);
+
+ addRequest(
+ () -> {
+ refreshConfigSnapshot(workerSyncTimeoutMs);
+ if (!configState.contains(connName))
+ throw new NotFoundException("Unknown connector " +
connName);
+
+ // We only allow the leader to handle this request since
it involves writing task configs to the config topic
+ if (!isLeader()) {
+ callback.onCompletion(new NotLeaderException("Only the
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+ return null;
+ }
+
+ // TODO: We may want to add a new ConfigBackingStore
method for stopping a connector so that
Review Comment:
Currently, the `KafkaConfigBackingStore` uses a transactional producer only
when exactly once support is enabled on the worker. Maybe Chris' concern was
that currently we'll only be able to do the tasks config + target state write
atomically when exactly once support is enabled on the worker? 🤔
Although that is already the case today for other operations that require
multiple writes to the config topic like `putTaskConfigs` and
`removeConnectorConfig`, so I'm not sure whether or not that was his concern.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]