yashmayya commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1147189361
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ########## @@ -245,6 +245,14 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba */ void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> cb); + /** + * Stop the conector. This call will asynchronously suspend processing by the connector and all + * of its tasks. Review Comment: Thanks, looks good! ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -221,28 +223,44 @@ public boolean isRunning() { } @SuppressWarnings("fallthrough") - private void pause() { + private void stop(boolean paused) { + State newState = paused ? State.PAUSED : State.STOPPED; try { - switch (state) { - case STOPPED: - return; + if ((state == State.STOPPED || state == State.PAUSED) && state == newState) { + // Already in the desired state + return; + } - case STARTED: - connector.stop(); - // fall through + if (state == State.STARTED) { + connector.stop(); + } - case INIT: - statusListener.onPause(connName); - this.state = State.STOPPED; - break; + if (state == State.FAILED && newState != State.STOPPED) { + throw new IllegalArgumentException("Cannot transition to non-stopped state when connector has already failed"); + } - default: - throw new IllegalArgumentException("Cannot pause connector in state " + state); + if (paused) { + statusListener.onPause(connName); + } else { + statusListener.onStop(connName); } + + this.state = newState; } catch (Throwable t) { - log.error("{} Error while shutting down connector", this, t); - statusListener.onFailure(connName, t); - this.state = State.FAILED; + log.error("{} Error while {} connector", this, paused ? "pausing" : "stopping", t); + if (paused) { + statusListener.onFailure(connName, t); + this.state = State.FAILED; + } else { + // We say the connector is STOPPED even if it fails at this point + this.state = State.STOPPED; + // One more try to make sure the status is updated correctly + try { + statusListener.onStop(connName); + } catch (Throwable t2) { + log.error("{} Error during failover attempt to stop connector", this, t2); + } Review Comment: Hm yeah I was essentially suggesting not having the additional try catch and keeping the stopped status update in a finally block. Things look good after your latest pushed changes though 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, final Map<String, String> ); } + @Override + public void stopConnector(final String connName, final Callback<Void> callback) { + log.trace("Submitting request to transition connector {} to STOPPED state", connName); + + addRequest( + () -> { + refreshConfigSnapshot(workerSyncTimeoutMs); Review Comment: > I do want to add a forced read-to-end after we write the new target state, though, so that the leader can correctly reject attempts to write new task configs for the connector if it is stopped. Yeah, that makes sense 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org