C0urante commented on a change in pull request #11775: URL: https://github.com/apache/kafka/pull/11775#discussion_r818848052
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -396,13 +441,58 @@ ConfigDef.Importance.LOW, INTER_WORKER_VERIFICATION_ALGORITHMS_DOC); + private final ExactlyOnceSourceSupport exactlyOnceSourceSupport; + @Override public Integer getRebalanceTimeout() { return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG); } + @Override + public boolean exactlyOnceSourceEnabled() { + return exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED; + } + + /** + * @return whether the Connect cluster's leader should use a transactional producer to perform writes to the config + * topic, which is useful for ensuring that zombie leaders are fenced out and unable to write to the topic after a + * new leader has been elected. + */ + public boolean transactionalLeaderEnabled() { + return exactlyOnceSourceSupport.usesTransactionalLeader; + } + + /** + * @return the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG transactional ID} to use for the worker's producer if + * the worker is the leader of the cluster and is + * {@link #transactionalLeaderEnabled() configured to use a transactional producer}. Review comment: Good call. I think rewording is probably safer just to avoid unexpected NPEs or at least unnecessary refactoring if we end up leveraging this method for more than just exactly-once support later on and don't want the return value to be dependent on things like the `exactly.once.source.support` property. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org