This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 953a3a52c4 MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder 953a3a52c4 is described below commit 953a3a52c463ecfd2f1c3db46dc44b4c8dbb06d8 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Fri Jul 29 02:48:35 2022 +0530 MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder Reviewers: Chris Egerton <fearthecel...@gmail.com> --- .../kafka/connect/runtime/distributed/DistributedHerder.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index ded833da59..388bfa4218 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1732,9 +1732,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { throw ConnectUtils.maybeWrap(cause, "Failed to perform round of zombie fencing"); } }, - () -> { - verifyTaskGenerationAndOwnership(taskId, taskGeneration); - } + () -> verifyTaskGenerationAndOwnership(taskId, taskGeneration) ); } else { return worker.startSourceTask( @@ -1941,8 +1939,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - // Currently unused, but will be invoked by exactly-once source tasks after they have successfully - // initialized their transactional producer + // Invoked by exactly-once worker source tasks after they have successfully initialized their transactional + // producer to ensure that it is still safe to bring up the task private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int initialTaskGen) { log.debug("Reading to end of config topic to ensure it is still safe to bring up source task {} with exactly-once support", id); if (!refreshConfigSnapshot(Long.MAX_VALUE)) {