yashmayya commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1150371852
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1090,6 +1090,40 @@ public void putConnectorConfig(final String connName, final Map<String, String> ); } + @Override + public void stopConnector(final String connName, final Callback<Void> callback) { + log.trace("Submitting request to transition connector {} to STOPPED state", connName); + + addRequest( + () -> { + if (!configState.contains(connName)) + throw new NotFoundException("Unknown connector " + connName); + + // We only allow the leader to handle this request since it involves writing task configs to the config topic + if (!isLeader()) { + callback.onCompletion(new NotLeaderException("Only the leader can transition connectors to the STOPPED state.", leaderUrl()), null); + return null; + } + + // TODO: We may want to add a new ConfigBackingStore method for stopping a connector so that + // these operations can be performed in a single (possibly-atomic) call + // We write the task configs first since, if we fail between then and writing the target state, the + // cluster is still kept in a healthy state. A RUNNING connector with zero tasks is acceptable (although, + // if the connector is reassigned during the ensuing rebalance, it is likely that it will immediately generate + // a non-empty set of task configs). A STOPPED connector with a non-empty set of tasks is less acceptable + // and likely to confuse users. + writeTaskConfigs(connName, Collections.emptyList()); Review Comment: I'm not sure I follow? I wasn't suggesting moving away from how task configs are updated in or read from the config topic. I was just wondering whether `KafkaConfigBackingStore` could implement the `removeTaskConfigs` method to do the same thing we're currently doing (writing an empty list of task configs). Although on second thought, the `removeTaskConfigs` method itself doesn't seem to add much in the `MemoryConfigBackingStore` implementation either and can be easily replaced by `putTaskConfigs` with an empty list of task configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org