satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714975169



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +174,35 @@ public void run() {
         }
     }
 
+    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+        boolean noOffsetUpdates = 
committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+        if (noOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, 
forceSync: {}", noOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            HashMap<Integer, Long> syncedPartitionToConsumedOffsets = new 
HashMap<>();
+            for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
+                int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
+                Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
+                if (offset != null && 
!offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) {
+                    
remotePartitionMetadataEventHandler.syncLogMetadataDataFile(topicIdPartition, 
metadataPartition, offset);
+                    syncedPartitionToConsumedOffsets.put(metadataPartition, 
offset);
+                } else {
+                    log.debug("Skipping syncup of the remote-log-metadata-file 
for partition:{} , with remote log metadata partition{},  and offset:{} ",
+                            topicIdPartition, metadataPartition, offset);
+                }
+            }
+
+            committedOffsetsFile.writeEntries(partitionToConsumedOffsets);

Review comment:
       We needed partitionToConsumedOffsets in the earlier check and we do not 
really need syncedPartitionToConsumedOffsets here. Updated to use 
partitionToConsumedOffsets for writing and setting 
committedPartitionToConsumedOffsets as partitionToConsumedOffsets.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to