C0urante commented on code in PR #13750: URL: https://github.com/apache/kafka/pull/13750#discussion_r1214589790
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final Callback<Void> callback) writeTaskConfigs(connName, Collections.emptyList()); configBackingStore.putTargetState(connName, TargetState.STOPPED); // Force a read of the new target state for the connector - refreshConfigSnapshot(workerSyncTimeoutMs); + if (!refreshConfigSnapshot(workerSyncTimeoutMs)) { + throw new ConnectException("Failed to read to end of config topic"); Review Comment: Agreed, better to log a warning 👍 I was also toying with the idea that we could shift the responsibility of refreshing the view of the config topic from operations that write to the topic, to operations that read from the topic. For example, if we need to check that a connector exists before pausing/resuming/restarting it, then that operation fails if we can't update our view of the topic. I don't think this is necessary to do in this PR and can be a follow-up. It's also questionable if we should even bother since there's still race conditions that can come from target state updates that originate from non-leader workers. But it's at least worth some food for thought. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org