satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r714975169
##########
File path:
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +174,35 @@ public void run() {
}
}
+ private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+ boolean noOffsetUpdates =
committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+ if (noOffsetUpdates || !forceSync && time.milliseconds() -
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+ log.debug("Skip syncing committed offsets, noOffsetUpdates: {},
forceSync: {}", noOffsetUpdates, forceSync);
+ return;
+ }
+
+ try {
+ HashMap<Integer, Long> syncedPartitionToConsumedOffsets = new
HashMap<>();
+ for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
+ int metadataPartition =
topicPartitioner.metadataPartition(topicIdPartition);
+ Long offset =
partitionToConsumedOffsets.get(metadataPartition);
+ if (offset != null &&
!offset.equals(committedPartitionToConsumedOffsets.get(metadataPartition))) {
+
remotePartitionMetadataEventHandler.syncLogMetadataDataFile(topicIdPartition,
metadataPartition, offset);
+ syncedPartitionToConsumedOffsets.put(metadataPartition,
offset);
+ } else {
+ log.debug("Skipping syncup of the remote-log-metadata-file
for partition:{} , with remote log metadata partition{}, and offset:{} ",
+ topicIdPartition, metadataPartition, offset);
+ }
+ }
+
+ committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
Review comment:
We needed partitionToConsumedOffsets in the earlier check and we do not
really need syncedPartitionToConsumedOffsets here. Updated to use
partitionToConsumedOffsets for writing and setting
committedPartitionToConsumedOffsets as partitionToConsumedOffsets.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]