gharris1727 commented on code in PR #15910:
URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+        class CheckpointRecordHandler {
+            private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+            void handle(Throwable error, ConsumerRecord<byte[], byte[]> 
cpRecord) {
+                // See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+                if (error instanceof KafkaException) {
+                    // only log once
+                    if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+                        log.error("Error loading Checkpoint topic", error);
+                        lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+                    }
+
+                    if (error instanceof RetriableException) {
+                        return;
+                    } else {
+                        throw (KafkaException) error;
+                    }
+                } else { // error is null
+                    lastLoggedErrorReadingCheckpoints = null;
+                    Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);

Review Comment:
   deserialization can fail due to bad data in the topic



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+        class CheckpointRecordHandler {
+            private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+            void handle(Throwable error, ConsumerRecord<byte[], byte[]> 
cpRecord) {
+                // See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+                if (error instanceof KafkaException) {
+                    // only log once
+                    if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+                        log.error("Error loading Checkpoint topic", error);
+                        lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+                    }
+
+                    if (error instanceof RetriableException) {
+                        return;
+                    } else {
+                        throw (KafkaException) error;
+                    }
+                } else { // error is null
+                    lastLoggedErrorReadingCheckpoints = null;
+                    Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
+                    if (consumerGroups.contains(cp.consumerGroupId())) {
+                        Map<TopicPartition, Checkpoint> cps = 
checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> 
new HashMap<>());
+                        cps.put(cp.topicPartition(), cp);
+                    }
+                }
+            }
+        }
+
+        CheckpointRecordHandler handler = new CheckpointRecordHandler();
+        TopicAdmin cpAdmin = null;
+        KafkaBasedLog<byte[], byte[]> previousCheckpoints = null;
+
+        try {
+            cpAdmin = new TopicAdmin(
+                    config.targetAdminConfig("checkpoint-target-admin"),
+                    
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));
+
+            previousCheckpoints = KafkaBasedLog.withExistingClients(
+                    config.checkpointsTopic(),
+                    
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
+                    null,
+                    cpAdmin,
+                    (error, cpRecord) -> handler.handle(error, cpRecord),
+                    Time.SYSTEM,
+                    ignored -> { },
+                    topicPartition -> topicPartition.partition() == 0);
+
+            log.info("Starting loading Checkpoint topic : {}", 
config.checkpointsTopic());
+            previousCheckpoints.start(true);
+            previousCheckpoints.stop();
+            log.info("Finished loading Checkpoint topic : {}", 
config.checkpointsTopic());
+            log.debug("Initial checkpointsPerConsumerGroup : {}", 
checkpointsPerConsumerGroup);
+            return true;
+        }  catch (KafkaException kexc) {

Review Comment:
   This relies on the fact that exceptions thrown from callbacks propagate 
synchronously to start(), which is an implementation detail of the 
KafkaBasedLog. It would make more sense to me if this class expected exceptions 
to be propagated via the callback, and then tested to see if the callback 
recorded any exceptions. At the moment the state variable in your handler is 
just a log-deduplication mechanism, but it feels like it should alter the 
control flow.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+        class CheckpointRecordHandler {
+            private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+            void handle(Throwable error, ConsumerRecord<byte[], byte[]> 
cpRecord) {
+                // See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+                if (error instanceof KafkaException) {
+                    // only log once
+                    if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+                        log.error("Error loading Checkpoint topic", error);
+                        lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+                    }
+
+                    if (error instanceof RetriableException) {
+                        return;
+                    } else {
+                        throw (KafkaException) error;
+                    }
+                } else { // error is null
+                    lastLoggedErrorReadingCheckpoints = null;
+                    Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
+                    if (consumerGroups.contains(cp.consumerGroupId())) {
+                        Map<TopicPartition, Checkpoint> cps = 
checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> 
new HashMap<>());

Review Comment:
   This map will need to be thread-safe now.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+        class CheckpointRecordHandler {
+            private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+            void handle(Throwable error, ConsumerRecord<byte[], byte[]> 
cpRecord) {
+                // See KafkaBasedLog.poll : only KafkaException can be passed 
as error

Review Comment:
   I think this assumption is detrimental, and this code would be broken if the 
assumption is violated in the future. Ee can just broaden the types to 
Throwable so that if a non-KafkaException comes through the callback, we handle 
it properly (instead of assuming it's null).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -104,8 +105,13 @@ private KafkaBasedLog<byte[], byte[]> 
createBackingStore(MirrorCheckpointConfig
 
     /**
      * Start the OffsetSyncStore, blocking until all previous Offset Syncs 
have been read from backing storage.
+     * @param optimisticLoading
      */
-    public void start() {
+    public void start(boolean optimisticLoading) {
+        this.pessimisticLoading = !optimisticLoading;

Review Comment:
   Is there any reason that the callers of start are "optimistic" and the 
implementation is "pessimistic"? Would it make sense for everyone to share the 
interpretation of this boolean?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -39,10 +39,12 @@ static class FakeOffsetSyncStore extends OffsetSyncStore {
             super();
         }
 
-        @Override
-        public void start() {

Review Comment:
   Okay yeah this start method is clunky. I would still like to have 
OffsetSyncStoreTest use the real start method though. I think a more idiomatic 
way of doing this would be to move `backingStore.start()` to a protected 
startBackingStore() method that can be overridden with a no-op in 
FakeOffsetSyncStore.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+        class CheckpointRecordHandler {

Review Comment:
   This is my first time seeing a local class in this codebase, it's a little 
bit strange.
   
   I would make this an anonymous class of Callback<ConsumerRecord<byte[], 
byte[]>> that uses an AtomicReference<KafkaException> instead to track the 
mutable state.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -116,6 +125,71 @@ public void start(Map<String, String> props) {
                 consumerGroups.size(), sourceClusterAlias, 
config.targetClusterAlias(), consumerGroups);
     }
 
+    // read the checkpoints topic to initialize the 
checkpointsPerConsumerGroup state of this task
+    private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) {
+
+        class CheckpointRecordHandler {
+            private volatile KafkaException lastLoggedErrorReadingCheckpoints 
= null;
+
+            void handle(Throwable error, ConsumerRecord<byte[], byte[]> 
cpRecord) {
+                // See KafkaBasedLog.poll : only KafkaException can be passed 
as error
+                if (error instanceof KafkaException) {
+                    // only log once
+                    if (lastLoggedErrorReadingCheckpoints == null || 
!lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) {
+                        log.error("Error loading Checkpoint topic", error);
+                        lastLoggedErrorReadingCheckpoints = (KafkaException) 
error;
+                    }
+
+                    if (error instanceof RetriableException) {
+                        return;
+                    } else {
+                        throw (KafkaException) error;

Review Comment:
   Typically callbacks don't rethrow the exceptions, they're meant to be 
informed about the errors, but leave the control flow for the caller to handle. 
I see that this is used to stop the KafkaBasedLog on receiving any fatal error, 
which is definitely needed in this case.
   
   Rather than having this control flow caused by exceptions and risk 
ungracefully killing the background thread: 
https://github.com/apache/kafka/blob/57d30d3450998465177f92516a41218dbe8d4340/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L609
 maybe we could split the current `stop()` into an asynchronous stop which is 
called from the callback, and a blocking stop which other threads can use to 
wait for the background thread to exit and everything to be cleaned up.
   
   I'm not totally satisfied with that design either, so if you've got some 
other ideas, please explore it more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to