gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1602156155


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -103,10 +113,11 @@ public void start(Map<String, String> props) {
         targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
         metrics = config.metrics();
         idleConsumerGroupsOffset = new HashMap<>();
-        checkpointsPerConsumerGroup = new HashMap<>();
+        Optional<Map<String, Map<TopicPartition, Checkpoint>>> checkpoints = 
readCheckpoints(config);

Review Comment:
   This is a potentially long blocking operation, and those should be avoided 
in start() methods because while the task is starting, it can't be stopped, and 
if the task can't be stopped within `task.shutdown.graceful.timeout.ms` it is 
aggressively cancelled.
   
   Since the main thread needs the result from readCheckpoints, I think it 
would be fine to check if it's been loaded and if not, just return an empty 
poll().



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +127,73 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    // the callback may only handle errors thrown by consumer.poll in 
KafkaBasedLog
+    // e.g. unauthorized to read from topic (non-retriable)
+    // if any are encountered, treat the loading of Checkpoints as failed.
+    Optional<Map<String, Map<TopicPartition, Checkpoint>>> 
readCheckpoints(MirrorCheckpointTaskConfig config) {
+        AtomicBoolean successful = new AtomicBoolean(true);
+        Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new 
HashMap<>();
+        Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new 
Callback<ConsumerRecord<byte[], byte[]>>() {
+            @Override
+            public void onCompletion(Throwable error, ConsumerRecord<byte[], 
byte[]> cpRecord) {
+                if (error != null && successful.getAndSet(false)) {
+                    log.error("Error loading Checkpoint topic", error);

Review Comment:
   I'm on the fence whether this should be error or warn. It it something that 
the user _must_ address? I'm not so sure.
   
   I do think that this should have an actionable recommendation, or an 
explanation that the task is gracefully degrading because of this.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -105,7 +106,13 @@ private KafkaBasedLog<byte[], byte[]> 
createBackingStore(MirrorCheckpointConfig
     /**
      * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
      */
-    public void start() {
+    public void start(boolean initializationMustReadToEnd) {
+        this.initializationMustReadToEnd = initializationMustReadToEnd;
+        if (initializationMustReadToEnd) {
+            log.warn("OffsetSyncStore initializationMustReadToEnd = {}", 
initializationMustReadToEnd);

Review Comment:
   debug level, this is not worth warning about.
   
   :+1: for the variable name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to