satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r719295072



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +171,37 @@ public void run() {
         }
     }
 
+    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+        boolean noOffsetUpdates = 
committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+        if (noOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, 
forceSync: {}", noOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            // partitionToConsumedOffsets is not getting changed concurrently 
as this method is called from #run() which updates the same.
+            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might get updated by other threads.
+            synchronized (assignPartitionsLock) {
+                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
+                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
+                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
+                    if (offset != null && 
!offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) {

Review comment:
       This is a known limitation that I want to address in the future. One 
possible way I was thinking is to keep track of the last processed event’s 
offset to be maintained in the in-memory store and in the snapshot file. If 
both these entries are the same then we will skip taking a new snapshot and 
committing it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to