gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602156155
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -103,10 +113,11 @@ public void start(Map<String, String> props) { targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")); metrics = config.metrics(); idleConsumerGroupsOffset = new HashMap<>(); - checkpointsPerConsumerGroup = new HashMap<>(); + Optional<Map<String, Map<TopicPartition, Checkpoint>>> checkpoints = readCheckpoints(config); Review Comment: This is a potentially long blocking operation, and those should be avoided in start() methods because while the task is starting, it can't be stopped, and if the task can't be stopped within `task.shutdown.graceful.timeout.ms` it is aggressively cancelled. Since the main thread needs the result from readCheckpoints, I think it would be fine to check if it's been loaded and if not, just return an empty poll(). ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +127,73 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + // the callback may only handle errors thrown by consumer.poll in KafkaBasedLog + // e.g. unauthorized to read from topic (non-retriable) + // if any are encountered, treat the loading of Checkpoints as failed. + Optional<Map<String, Map<TopicPartition, Checkpoint>>> readCheckpoints(MirrorCheckpointTaskConfig config) { + AtomicBoolean successful = new AtomicBoolean(true); + Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new HashMap<>(); + Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() { + @Override + public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + if (error != null && successful.getAndSet(false)) { + log.error("Error loading Checkpoint topic", error); Review Comment: I'm on the fence whether this should be error or warn. It it something that the user _must_ address? I'm not so sure. I do think that this should have an actionable recommendation, or an explanation that the task is gracefully degrading because of this. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -105,7 +106,13 @@ private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ - public void start() { + public void start(boolean initializationMustReadToEnd) { + this.initializationMustReadToEnd = initializationMustReadToEnd; + if (initializationMustReadToEnd) { + log.warn("OffsetSyncStore initializationMustReadToEnd = {}", initializationMustReadToEnd); Review Comment: debug level, this is not worth warning about. :+1: for the variable name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org