C0urante commented on code in PR #13367: URL: https://github.com/apache/kafka/pull/13367#discussion_r1134001434
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ########## @@ -99,11 +99,13 @@ public void testOffsetSync() { partitionState.reset(); assertFalse(partitionState.update(3, 152), "no sync"); partitionState.reset(); - assertFalse(partitionState.update(4, 153), "no sync"); + assertTrue(partitionState.update(4, 153), "one past target offset"); partitionState.reset(); assertFalse(partitionState.update(5, 154), "no sync"); partitionState.reset(); - assertTrue(partitionState.update(6, 205), "one past target offset"); + assertFalse(partitionState.update(6, 203), "one past target offset"); Review Comment: Copy-paste error? ```suggestion assertFalse(partitionState.update(6, 203), "no sync"); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -316,13 +315,14 @@ static class PartitionState { // true if we should emit an offset sync boolean update(long upstreamOffset, long downstreamOffset) { - long upstreamStep = upstreamOffset - lastSyncUpstreamOffset; - long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep; + // This value is what OffsetSyncStore::translateOffsets would compute for this offset given the last sync. + // Because this method is called at most once for each upstream offset, simplify upstreamStep to 1. Review Comment: This comment mentions `upstreamStep` but we no longer have a field named that. Is this line really necessary? Do you think it would be clear enough with just the comments on lines 318 and 320? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -870,6 +864,20 @@ protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, } } + protected static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer) { + ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + // After a full sync, there should be at most offset.lag.max records per partition consumed by both upstream and downstream consumers. + Map<TopicPartition, Integer> perPartitionCount = new HashMap<>(); + for (ConsumerRecord<byte[], byte[]> record : records) { + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + int previous = perPartitionCount.getOrDefault(tp, 0); + perPartitionCount.put(tp, previous + 1); + } + for (Map.Entry<TopicPartition, Integer> entry : perPartitionCount.entrySet()) { + assertTrue(entry.getValue() < OFFSET_LAG_MAX, "downstream consumer is re-reading more than " + OFFSET_LAG_MAX + " records from" + entry.getKey()); + } Review Comment: This can be simplified: ```suggestion for (TopicPartition tp : records.partitions()) { int count = records.records(tp).size(); assertTrue(count < OFFSET_LAG_MAX, "downstream consumer is re-reading more than " + OFFSET_LAG_MAX + " records from" + tp); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org