yashmayya commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1147189361


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -245,6 +245,14 @@ default void validateConnectorConfig(Map<String, String> 
connectorConfig, Callba
      */
     void restartConnectorAndTasks(RestartRequest request, 
Callback<ConnectorStateInfo> cb);
 
+    /**
+     * Stop the conector. This call will asynchronously suspend processing by 
the connector and all
+     * of its tasks.

Review Comment:
   Thanks, looks good!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -221,28 +223,44 @@ public boolean isRunning() {
     }
 
     @SuppressWarnings("fallthrough")
-    private void pause() {
+    private void stop(boolean paused) {
+        State newState = paused ? State.PAUSED : State.STOPPED;
         try {
-            switch (state) {
-                case STOPPED:
-                    return;
+            if ((state == State.STOPPED || state == State.PAUSED) && state == 
newState) {
+                // Already in the desired state
+                return;
+            }
 
-                case STARTED:
-                    connector.stop();
-                    // fall through
+            if (state == State.STARTED) {
+                connector.stop();
+            }
 
-                case INIT:
-                    statusListener.onPause(connName);
-                    this.state = State.STOPPED;
-                    break;
+            if (state == State.FAILED && newState != State.STOPPED) {
+                throw new IllegalArgumentException("Cannot transition to 
non-stopped state when connector has already failed");
+            }
 
-                default:
-                    throw new IllegalArgumentException("Cannot pause connector 
in state " + state);
+            if (paused) {
+                statusListener.onPause(connName);
+            } else {
+                statusListener.onStop(connName);
             }
+
+            this.state = newState;
         } catch (Throwable t) {
-            log.error("{} Error while shutting down connector", this, t);
-            statusListener.onFailure(connName, t);
-            this.state = State.FAILED;
+            log.error("{} Error while {} connector", this, paused ? "pausing" 
: "stopping", t);
+            if (paused) {
+                statusListener.onFailure(connName, t);
+                this.state = State.FAILED;
+            } else {
+                // We say the connector is STOPPED even if it fails at this 
point
+                this.state = State.STOPPED;
+                // One more try to make sure the status is updated correctly
+                try {
+                    statusListener.onStop(connName);
+                } catch (Throwable t2) {
+                    log.error("{} Error during failover attempt to stop 
connector", this, t2);
+                }

Review Comment:
   Hm yeah I was essentially suggesting not having the additional try catch and 
keeping the stopped status update in a finally block. Things look good after 
your latest pushed changes though 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void stopConnector(final String connName, final Callback<Void> 
callback) {
+        log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+        addRequest(
+                () -> {
+                    refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   > I do want to add a forced read-to-end after we write the new target state, 
though, so that the leader can correctly reject attempts to write new task 
configs for the connector if it is stopped.
   
   Yeah, that makes sense 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to