prestona commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602234146


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +127,73 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    // the callback may only handle errors thrown by consumer.poll in 
KafkaBasedLog
+    // e.g. unauthorized to read from topic (non-retriable)
+    // if any are encountered, treat the loading of Checkpoints as failed.
+    Optional<Map<String, Map<TopicPartition, Checkpoint>>> 
readCheckpoints(MirrorCheckpointTaskConfig config) {
+        AtomicBoolean successful = new AtomicBoolean(true);
+        Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new 
HashMap<>();
+        Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new 
Callback<ConsumerRecord<byte[], byte[]>>() {
+            @Override
+            public void onCompletion(Throwable error, ConsumerRecord<byte[], 
byte[]> cpRecord) {
+                if (error != null && successful.getAndSet(false)) {
+                    log.error("Error loading Checkpoint topic", error);

Review Comment:
   If we special case not authorized (as above), then the main reasons for 
hitting this are (hopefully?) transitory problems - for example: all brokers 
being down when the connector is first started. I agree that this should be a 
warning with a better explanation of impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to