C0urante commented on code in PR #13750:
URL: https://github.com/apache/kafka/pull/13750#discussion_r1214589790
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1104,7 +1104,9 @@ public void stopConnector(final String connName, final
Callback<Void> callback)
writeTaskConfigs(connName, Collections.emptyList());
configBackingStore.putTargetState(connName,
TargetState.STOPPED);
// Force a read of the new target state for the connector
- refreshConfigSnapshot(workerSyncTimeoutMs);
+ if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
+ throw new ConnectException("Failed to read to end of
config topic");
Review Comment:
Agreed, better to log a warning 👍
I was also toying with the idea that we could shift the responsibility of
refreshing the view of the config topic from operations that write to the
topic, to operations that read from the topic. For example, if we need to check
that a connector exists before pausing/resuming/restarting it, then that
operation fails if we can't update our view of the topic.
I don't think this is necessary to do in this PR and can be a follow-up.
It's also questionable if we should even bother since there's still race
conditions that can come from target state updates that originate from
non-leader workers.
But it's at least worth some food for thought.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]