C0urante commented on code in PR #13367:
URL: https://github.com/apache/kafka/pull/13367#discussion_r1134001434


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -99,11 +99,13 @@ public void testOffsetSync() {
         partitionState.reset();
         assertFalse(partitionState.update(3, 152), "no sync");
         partitionState.reset();
-        assertFalse(partitionState.update(4, 153), "no sync");
+        assertTrue(partitionState.update(4, 153), "one past target offset");
         partitionState.reset();
         assertFalse(partitionState.update(5, 154), "no sync");
         partitionState.reset();
-        assertTrue(partitionState.update(6, 205), "one past target offset");
+        assertFalse(partitionState.update(6, 203), "one past target offset");

Review Comment:
   Copy-paste error?
   
   ```suggestion
           assertFalse(partitionState.update(6, 203), "no sync");
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -316,13 +315,14 @@ static class PartitionState {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
-            long downstreamTargetOffset = lastSyncDownstreamOffset + 
upstreamStep;
+            // This value is what OffsetSyncStore::translateOffsets would 
compute for this offset given the last sync.
+            // Because this method is called at most once for each upstream 
offset, simplify upstreamStep to 1.

Review Comment:
   This comment mentions `upstreamStep` but we no longer have a field named 
that.
   
   Is this line really necessary? Do you think it would be clear enough with 
just the comments on lines 318 and 320?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -870,6 +864,20 @@ protected static void 
assertMonotonicCheckpoints(EmbeddedConnectCluster cluster,
         }
     }
 
+    protected static void 
assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> 
targetConsumer) {
+        ConsumerRecords<byte[], byte[]> records = 
targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+        // After a full sync, there should be at most offset.lag.max records 
per partition consumed by both upstream and downstream consumers.
+        Map<TopicPartition, Integer> perPartitionCount = new HashMap<>();
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
+            int previous = perPartitionCount.getOrDefault(tp, 0);
+            perPartitionCount.put(tp, previous + 1);
+        }
+        for (Map.Entry<TopicPartition, Integer> entry : 
perPartitionCount.entrySet()) {
+            assertTrue(entry.getValue() < OFFSET_LAG_MAX,  "downstream 
consumer is re-reading more than " + OFFSET_LAG_MAX + " records from" + 
entry.getKey());
+        }

Review Comment:
   This can be simplified:
   
   ```suggestion
           for (TopicPartition tp : records.partitions()) {
               int count = records.records(tp).size();
               assertTrue(count < OFFSET_LAG_MAX,  "downstream consumer is 
re-reading more than " + OFFSET_LAG_MAX + " records from" + tp);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to