mimaison commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1143576130


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java:
##########
@@ -103,6 +103,9 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
     public void stop() {
         log.info("Stopped {} connector {}", this.getClass().getSimpleName(), 
connectorName);
         connectorHandle.recordConnectorStop();
+        if 
(Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", 
"false"))) {
+            throw new RuntimeException("Injecting errors during connector 
start");

Review Comment:
   `during connector start` -> `during connector stop`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -245,6 +245,14 @@ default void validateConnectorConfig(Map<String, String> 
connectorConfig, Callba
      */
     void restartConnectorAndTasks(RestartRequest request, 
Callback<ConnectorStateInfo> cb);
 
+    /**
+     * Stop the conector. This call will asynchronously suspend processing by 
the connector and all

Review Comment:
   `connector`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void stopConnector(final String connName, final Callback<Void> 
callback) {
+        log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+        addRequest(
+                () -> {
+                    refreshConfigSnapshot(workerSyncTimeoutMs);
+                    if (!configState.contains(connName))
+                        throw new NotFoundException("Unknown connector " + 
connName);
+
+                    // We only allow the leader to handle this request since 
it involves writing task configs to the config topic
+                    if (!isLeader()) {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+                        return null;
+                    }
+
+                    // TODO: We may want to add a new ConfigBackingStore 
method for stopping a connector so that

Review Comment:
   Should we create a ticket for this TODO?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -221,28 +223,44 @@ public boolean isRunning() {
     }
 
     @SuppressWarnings("fallthrough")
-    private void pause() {
+    private void stop(boolean paused) {
+        State newState = paused ? State.PAUSED : State.STOPPED;
         try {
-            switch (state) {
-                case STOPPED:
-                    return;
+            if ((state == State.STOPPED || state == State.PAUSED) && state == 
newState) {

Review Comment:
   As `newState` is either `PAUSED` or `STOPPED`, would `state == newState` be 
enough here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to