shangxinli commented on code in PR #18125:
URL: https://github.com/apache/hudi/pull/18125#discussion_r2880979404


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +733,153 @@ public static void validateWriteStatus(
       });
     }
   }
+
+  /**
+   * Extracts Kafka offset metadata from a Hudi commit.
+   *
+   * @param commitInstant The commit instant to extract metadata from
+   * @param timeline The timeline to read commit details from
+   * @return The Kafka offset metadata string, or null if not found
+   * @throws IOException if error reading commit metadata
+   */
+  public static String extractKafkaOffsetMetadata(final HoodieInstant 
commitInstant,
+                                                  final HoodieTimeline 
timeline) throws IOException {
+    HoodieCommitMetadata commitMetadata = 
TimelineUtils.getCommitMetadata(commitInstant, timeline);
+    return commitMetadata.getExtraMetadata().get(HOODIE_METADATA_KEY);
+  }
+
+  /**
+   * Parses Kafka offset string and restores partition offsets as Map.
+   *
+   * @param kafkaOffsetString URL-encoded Kafka offset string in format:
+   *        
"kafka_metadata%3Atopic%3Apartition:offset;kafka_metadata%3Atopic%3Apartition:offset"
+   * @return Map of partition ID to offset, or empty map if parsing fails
+   */
+  public static Map<Integer, Long> parseKafkaOffsets(final String 
kafkaOffsetString) {
+    Map<Integer, Long> partitionOffsets = new HashMap<>();
+
+    if (kafkaOffsetString == null || kafkaOffsetString.isEmpty()) {
+      return partitionOffsets;
+    }
+
+    try {
+      // Split by semicolon to get individual partition entries
+      String[] partitionEntries = kafkaOffsetString.split(PARTITION_SEPARATOR);
+
+      for (String entry : partitionEntries) {
+        entry = entry.trim();
+
+        // Skip cluster metadata entries (they don't contain partition:offset 
format)
+        if (!entry.contains(":") || entry.contains("kafka_cluster")) {
+          continue;
+        }
+
+        // Entry format: kafka_metadata%3Atopic%3Apartition:offset
+        // Find the last colon which separates partition and offset
+        int lastColonIndex = entry.lastIndexOf(':');
+        if (lastColonIndex == -1) {
+          continue;
+        }
+
+        String offsetStr = entry.substring(lastColonIndex + 1);
+        String beforeOffset = entry.substring(0, lastColonIndex);
+
+        // Find the partition number (everything after the last %3A before the 
colon)
+        int lastEncodedColonIndex = 
beforeOffset.lastIndexOf(URL_ENCODED_COLON);
+        if (lastEncodedColonIndex == -1) {
+          continue;
+        }
+
+        String partitionStr = beforeOffset.substring(lastEncodedColonIndex + 
URL_ENCODED_COLON.length());
+
+        try {
+          int partitionId = Integer.parseInt(partitionStr);
+          long offset = Long.parseLong(offsetStr);
+          partitionOffsets.put(partitionId, offset);
+        } catch (NumberFormatException e) {
+          log.warn("Failed to parse partition ID or offset from entry: {}", 
entry, e);
+        }
+      }
+    } catch (Exception e) {
+      log.error("Failed to parse Kafka offset string: {}", kafkaOffsetString, 
e);
+    }
+
+    return partitionOffsets;
+  }
+
+  /**
+   * Calculates the total difference count between Kafka offsets of two Hudi 
commits.
+   * The difference is: current commit's kafka offset minus previous commit's 
kafka offset
+   * for each partition. All partition differences are summed together.
+   *
+   * @param currentCommitInstant The current (newer) commit instant
+   * @param previousCommitInstant The previous (older) commit instant
+   * @param timeline The timeline to read commit details from
+   * @return The total count of offset differences across all partitions, or 0 
if calculation fails
+   */
+  public static long calculateKafkaOffsetDifference(
+      final HoodieInstant currentCommitInstant,
+      final HoodieInstant previousCommitInstant,
+      final HoodieTimeline timeline) {
+
+    if (currentCommitInstant == null || previousCommitInstant == null || 
timeline == null) {
+      log.warn("Invalid input parameters for calculating Kafka offset 
difference");
+      return 0L;
+    }
+
+    try {
+      // Extract Kafka offset metadata from both commits
+      String currentOffsetString = 
extractKafkaOffsetMetadata(currentCommitInstant, timeline);
+      String previousOffsetString = 
extractKafkaOffsetMetadata(previousCommitInstant, timeline);
+
+      if (currentOffsetString == null || previousOffsetString == null) {
+        log.warn("Kafka offset metadata not found in one or both commits: 
current={}, previous={}",
+            currentCommitInstant.requestedTime(), 
previousCommitInstant.requestedTime());
+        return 0L;
+      }
+
+      // Parse offset strings into partition -> offset maps
+      Map<Integer, Long> currentOffsets = 
parseKafkaOffsets(currentOffsetString);
+      Map<Integer, Long> previousOffsets = 
parseKafkaOffsets(previousOffsetString);
+
+      if (currentOffsets.isEmpty() || previousOffsets.isEmpty()) {
+        log.warn("Failed to parse Kafka offsets from commits: current={}, 
previous={}",
+            currentCommitInstant.requestedTime(), 
previousCommitInstant.requestedTime());
+        return 0L;
+      }
+
+      // Calculate total difference across all partitions
+      long totalDifference = 0L;
+
+      for (Map.Entry<Integer, Long> currentEntry : currentOffsets.entrySet()) {

Review Comment:
   A partition in the previous commit but missing from the current one means no 
records were consumed from it in this batch — so its offset diff contribution 
is correctly 0, and we don't need to include it in the sum.
   
     However, a disappearing partition could signal topic reconfiguration or a 
consumer rebalance issue, so I added a LOG.warn to flag it. No change to the 
diff calculation logic itself.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to