satishd commented on a change in pull request #11058: URL: https://github.com/apache/kafka/pull/11058#discussion_r719295072
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ########## @@ -120,6 +171,37 @@ public void run() { } } + private void maybeSyncCommittedDataAndOffsets(boolean forceSync) { + boolean noOffsetUpdates = committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets); + if (noOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) { + log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, forceSync: {}", noOffsetUpdates, forceSync); + return; + } + + try { + // partitionToConsumedOffsets is not getting changed concurrently as this method is called from #run() which updates the same. + // Need to take lock on assignPartitionsLock as assignedTopicPartitions might get updated by other threads. + synchronized (assignPartitionsLock) { + for (TopicIdPartition topicIdPartition : assignedTopicPartitions) { + int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition); + Long offset = partitionToConsumedOffsets.get(metadataPartition); + if (offset != null && !offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) { Review comment: This is a known limitation that I want to address in the future. One possible way I was thinking is to keep track of the last processed event’s offset to be maintained in the in-memory store and in the snapshot file. If both these entries are the same then we will skip taking a new snapshot and committing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org