C0urante commented on code in PR #13367:
URL: https://github.com/apache/kafka/pull/13367#discussion_r1135661761


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -315,18 +314,16 @@ static class PartitionState {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            // This value is what OffsetSyncStore::translateOffsets would 
compute for this offset given the last sync.
-            // Because this method is called at most once for each upstream 
offset, simplify upstreamStep to 1.
+            // Emit an offset sync if any of the following conditions are true
+            boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == 
-1L;
+            // the OffsetSync::translateDownstream method will translate this 
offset 1 past the last sync, so add 1.
             // TODO: share common implementation to enforce this relationship
-            long downstreamTargetOffset = lastSyncDownstreamOffset + 1;
-            if (lastSyncDownstreamOffset == -1L
-                    || downstreamOffset - downstreamTargetOffset >= 
maxOffsetLag
-                    || upstreamOffset - previousUpstreamOffset != 1L

Review Comment:
   Why are we removing this condition?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -315,18 +314,16 @@ static class PartitionState {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            // This value is what OffsetSyncStore::translateOffsets would 
compute for this offset given the last sync.
-            // Because this method is called at most once for each upstream 
offset, simplify upstreamStep to 1.
+            // Emit an offset sync if any of the following conditions are true
+            boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == 
-1L;
+            // the OffsetSync::translateDownstream method will translate this 
offset 1 past the last sync, so add 1.
             // TODO: share common implementation to enforce this relationship
-            long downstreamTargetOffset = lastSyncDownstreamOffset + 1;
-            if (lastSyncDownstreamOffset == -1L
-                    || downstreamOffset - downstreamTargetOffset >= 
maxOffsetLag
-                    || upstreamOffset - previousUpstreamOffset != 1L

Review Comment:
   Why are we removing this condition? Is that necessary for this fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to