gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +125,71 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + + class CheckpointRecordHandler { + private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + + void handle(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + // See KafkaBasedLog.poll : only KafkaException can be passed as error + if (error instanceof KafkaException) { + // only log once + if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { + log.error("Error loading Checkpoint topic", error); + lastLoggedErrorReadingCheckpoints = (KafkaException) error; + } + + if (error instanceof RetriableException) { + return; + } else { + throw (KafkaException) error; + } + } else { // error is null + lastLoggedErrorReadingCheckpoints = null; + Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); Review Comment: deserialization can fail due to bad data in the topic ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +125,71 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + + class CheckpointRecordHandler { + private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + + void handle(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + // See KafkaBasedLog.poll : only KafkaException can be passed as error + if (error instanceof KafkaException) { + // only log once + if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { + log.error("Error loading Checkpoint topic", error); + lastLoggedErrorReadingCheckpoints = (KafkaException) error; + } + + if (error instanceof RetriableException) { + return; + } else { + throw (KafkaException) error; + } + } else { // error is null + lastLoggedErrorReadingCheckpoints = null; + Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); + if (consumerGroups.contains(cp.consumerGroupId())) { + Map<TopicPartition, Checkpoint> cps = checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> new HashMap<>()); + cps.put(cp.topicPartition(), cp); + } + } + } + } + + CheckpointRecordHandler handler = new CheckpointRecordHandler(); + TopicAdmin cpAdmin = null; + KafkaBasedLog<byte[], byte[]> previousCheckpoints = null; + + try { + cpAdmin = new TopicAdmin( + config.targetAdminConfig("checkpoint-target-admin"), + config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"))); + + previousCheckpoints = KafkaBasedLog.withExistingClients( + config.checkpointsTopic(), + MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)), + null, + cpAdmin, + (error, cpRecord) -> handler.handle(error, cpRecord), + Time.SYSTEM, + ignored -> { }, + topicPartition -> topicPartition.partition() == 0); + + log.info("Starting loading Checkpoint topic : {}", config.checkpointsTopic()); + previousCheckpoints.start(true); + previousCheckpoints.stop(); + log.info("Finished loading Checkpoint topic : {}", config.checkpointsTopic()); + log.debug("Initial checkpointsPerConsumerGroup : {}", checkpointsPerConsumerGroup); + return true; + } catch (KafkaException kexc) { Review Comment: This relies on the fact that exceptions thrown from callbacks propagate synchronously to start(), which is an implementation detail of the KafkaBasedLog. It would make more sense to me if this class expected exceptions to be propagated via the callback, and then tested to see if the callback recorded any exceptions. At the moment the state variable in your handler is just a log-deduplication mechanism, but it feels like it should alter the control flow. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +125,71 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + + class CheckpointRecordHandler { + private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + + void handle(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + // See KafkaBasedLog.poll : only KafkaException can be passed as error + if (error instanceof KafkaException) { + // only log once + if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { + log.error("Error loading Checkpoint topic", error); + lastLoggedErrorReadingCheckpoints = (KafkaException) error; + } + + if (error instanceof RetriableException) { + return; + } else { + throw (KafkaException) error; + } + } else { // error is null + lastLoggedErrorReadingCheckpoints = null; + Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); + if (consumerGroups.contains(cp.consumerGroupId())) { + Map<TopicPartition, Checkpoint> cps = checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> new HashMap<>()); Review Comment: This map will need to be thread-safe now. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +125,71 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + + class CheckpointRecordHandler { + private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + + void handle(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + // See KafkaBasedLog.poll : only KafkaException can be passed as error Review Comment: I think this assumption is detrimental, and this code would be broken if the assumption is violated in the future. Ee can just broaden the types to Throwable so that if a non-KafkaException comes through the callback, we handle it properly (instead of assuming it's null). ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -104,8 +105,13 @@ private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. + * @param optimisticLoading */ - public void start() { + public void start(boolean optimisticLoading) { + this.pessimisticLoading = !optimisticLoading; Review Comment: Is there any reason that the callers of start are "optimistic" and the implementation is "pessimistic"? Would it make sense for everyone to share the interpretation of this boolean? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ########## @@ -39,10 +39,12 @@ static class FakeOffsetSyncStore extends OffsetSyncStore { super(); } - @Override - public void start() { Review Comment: Okay yeah this start method is clunky. I would still like to have OffsetSyncStoreTest use the real start method though. I think a more idiomatic way of doing this would be to move `backingStore.start()` to a protected startBackingStore() method that can be overridden with a no-op in FakeOffsetSyncStore. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +125,71 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + + class CheckpointRecordHandler { Review Comment: This is my first time seeing a local class in this codebase, it's a little bit strange. I would make this an anonymous class of Callback<ConsumerRecord<byte[], byte[]>> that uses an AtomicReference<KafkaException> instead to track the mutable state. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -116,6 +125,71 @@ public void start(Map<String, String> props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } + // read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task + private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + + class CheckpointRecordHandler { + private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + + void handle(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) { + // See KafkaBasedLog.poll : only KafkaException can be passed as error + if (error instanceof KafkaException) { + // only log once + if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { + log.error("Error loading Checkpoint topic", error); + lastLoggedErrorReadingCheckpoints = (KafkaException) error; + } + + if (error instanceof RetriableException) { + return; + } else { + throw (KafkaException) error; Review Comment: Typically callbacks don't rethrow the exceptions, they're meant to be informed about the errors, but leave the control flow for the caller to handle. I see that this is used to stop the KafkaBasedLog on receiving any fatal error, which is definitely needed in this case. Rather than having this control flow caused by exceptions and risk ungracefully killing the background thread: https://github.com/apache/kafka/blob/57d30d3450998465177f92516a41218dbe8d4340/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L609 maybe we could split the current `stop()` into an asynchronous stop which is called from the callback, and a blocking stop which other threads can use to wait for the background thread to exit and everything to be cleaned up. I'm not totally satisfied with that design either, so if you've got some other ideas, please explore it more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org