C0urante commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1150919689


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1090,6 +1090,40 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void stopConnector(final String connName, final Callback<Void> 
callback) {
+        log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+        addRequest(
+                () -> {
+                    if (!configState.contains(connName))
+                        throw new NotFoundException("Unknown connector " + 
connName);
+
+                    // We only allow the leader to handle this request since 
it involves writing task configs to the config topic
+                    if (!isLeader()) {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+                        return null;
+                    }
+
+                    // TODO: We may want to add a new ConfigBackingStore 
method for stopping a connector so that
+                    //       these operations can be performed in a single 
(possibly-atomic) call
+                    // We write the task configs first since, if we fail 
between then and writing the target state, the
+                    // cluster is still kept in a healthy state. A RUNNING 
connector with zero tasks is acceptable (although,
+                    // if the connector is reassigned during the ensuing 
rebalance, it is likely that it will immediately generate
+                    // a non-empty set of task configs). A STOPPED connector 
with a non-empty set of tasks is less acceptable
+                    // and likely to confuse users.
+                    writeTaskConfigs(connName, Collections.emptyList());

Review Comment:
   Ahh sorry, I misunderstood--your suggestion is much more reasonable than 
what I thought was being proposed.
   
   Thinking about it, I kind of prefer the existing API. It's a bit out-there 
but there might come a time when we want to distinguish between completely 
wiping all history of task configs for a connector from the config topic, and 
publishing an empty set of task configs (sort of like the difference between a 
null `List` and an empty one in Java).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to