prestona commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602234146
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +127,73 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + // the callback may only handle errors thrown by consumer.poll in KafkaBasedLog + // e.g. unauthorized to read from topic (non-retriable) + // if any are encountered, treat the loading of Checkpoints as failed. + Optional<Map<String, Map<TopicPartition, Checkpoint>>> readCheckpoints(MirrorCheckpointTaskConfig config) { + AtomicBoolean successful = new AtomicBoolean(true); + Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new HashMap<>(); + Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() { + @Override + public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + if (error != null && successful.getAndSet(false)) { + log.error("Error loading Checkpoint topic", error); Review Comment: If we special case not authorized (as above), then the main reasons for hitting this are (hopefully?) transitory problems - for example: all brokers being down when the connector is first started. I agree that this should be a warning with a better explanation of impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org