shangxinli commented on code in PR #18125:
URL: https://github.com/apache/hudi/pull/18125#discussion_r2880716221
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +733,153 @@ public static void validateWriteStatus(
});
}
}
+
+ /**
+ * Extracts Kafka offset metadata from a Hudi commit.
+ *
+ * @param commitInstant The commit instant to extract metadata from
+ * @param timeline The timeline to read commit details from
+ * @return The Kafka offset metadata string, or null if not found
+ * @throws IOException if error reading commit metadata
+ */
+ public static String extractKafkaOffsetMetadata(final HoodieInstant
commitInstant,
+ final HoodieTimeline
timeline) throws IOException {
+ HoodieCommitMetadata commitMetadata =
TimelineUtils.getCommitMetadata(commitInstant, timeline);
+ return commitMetadata.getExtraMetadata().get(HOODIE_METADATA_KEY);
+ }
+
+ /**
+ * Parses Kafka offset string and restores partition offsets as Map.
+ *
+ * @param kafkaOffsetString URL-encoded Kafka offset string in format:
+ *
"kafka_metadata%3Atopic%3Apartition:offset;kafka_metadata%3Atopic%3Apartition:offset"
+ * @return Map of partition ID to offset, or empty map if parsing fails
+ */
+ public static Map<Integer, Long> parseKafkaOffsets(final String
kafkaOffsetString) {
Review Comment:
Good catch — made it package-private. It is only used internally by
`calculateKafkaOffsetDifference` within the same class, and the tests are in
the same package so they still have access.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +733,153 @@ public static void validateWriteStatus(
});
}
}
+
+ /**
+ * Extracts Kafka offset metadata from a Hudi commit.
+ *
+ * @param commitInstant The commit instant to extract metadata from
+ * @param timeline The timeline to read commit details from
+ * @return The Kafka offset metadata string, or null if not found
+ * @throws IOException if error reading commit metadata
+ */
+ public static String extractKafkaOffsetMetadata(final HoodieInstant
commitInstant,
+ final HoodieTimeline
timeline) throws IOException {
+ HoodieCommitMetadata commitMetadata =
TimelineUtils.getCommitMetadata(commitInstant, timeline);
+ return commitMetadata.getExtraMetadata().get(HOODIE_METADATA_KEY);
+ }
+
+ /**
+ * Parses Kafka offset string and restores partition offsets as Map.
+ *
+ * @param kafkaOffsetString URL-encoded Kafka offset string in format:
+ *
"kafka_metadata%3Atopic%3Apartition:offset;kafka_metadata%3Atopic%3Apartition:offset"
+ * @return Map of partition ID to offset, or empty map if parsing fails
+ */
+ public static Map<Integer, Long> parseKafkaOffsets(final String
kafkaOffsetString) {
+ Map<Integer, Long> partitionOffsets = new HashMap<>();
+
+ if (kafkaOffsetString == null || kafkaOffsetString.isEmpty()) {
+ return partitionOffsets;
+ }
+
+ try {
+ // Split by semicolon to get individual partition entries
+ String[] partitionEntries = kafkaOffsetString.split(PARTITION_SEPARATOR);
+
+ for (String entry : partitionEntries) {
+ entry = entry.trim();
+
+ // Skip cluster metadata entries (they don't contain partition:offset
format)
+ if (!entry.contains(":") || entry.contains("kafka_cluster")) {
+ continue;
+ }
+
+ // Entry format: kafka_metadata%3Atopic%3Apartition:offset
+ // Find the last colon which separates partition and offset
+ int lastColonIndex = entry.lastIndexOf(':');
+ if (lastColonIndex == -1) {
+ continue;
+ }
+
+ String offsetStr = entry.substring(lastColonIndex + 1);
+ String beforeOffset = entry.substring(0, lastColonIndex);
+
+ // Find the partition number (everything after the last %3A before the
colon)
+ int lastEncodedColonIndex =
beforeOffset.lastIndexOf(URL_ENCODED_COLON);
+ if (lastEncodedColonIndex == -1) {
+ continue;
+ }
+
+ String partitionStr = beforeOffset.substring(lastEncodedColonIndex +
URL_ENCODED_COLON.length());
+
+ try {
+ int partitionId = Integer.parseInt(partitionStr);
+ long offset = Long.parseLong(offsetStr);
+ partitionOffsets.put(partitionId, offset);
+ } catch (NumberFormatException e) {
+ log.warn("Failed to parse partition ID or offset from entry: {}",
entry, e);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to parse Kafka offset string: {}", kafkaOffsetString,
e);
+ }
+
+ return partitionOffsets;
+ }
+
+ /**
+ * Calculates the total difference count between Kafka offsets of two Hudi
commits.
+ * The difference is: current commit's kafka offset minus previous commit's
kafka offset
+ * for each partition. All partition differences are summed together.
+ *
+ * @param currentCommitInstant The current (newer) commit instant
+ * @param previousCommitInstant The previous (older) commit instant
+ * @param timeline The timeline to read commit details from
+ * @return The total count of offset differences across all partitions, or 0
if calculation fails
+ */
+ public static long calculateKafkaOffsetDifference(
Review Comment:
Yes, that is possible if the commit has been archived. Added a Javadoc note
that both commit instants must be present in the provided timeline — if a
commit has been archived and is no longer in the active timeline, this method
will fail. The caller is responsible for ensuring the instants are accessible.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +733,153 @@ public static void validateWriteStatus(
});
}
}
+
+ /**
+ * Extracts Kafka offset metadata from a Hudi commit.
+ *
+ * @param commitInstant The commit instant to extract metadata from
+ * @param timeline The timeline to read commit details from
+ * @return The Kafka offset metadata string, or null if not found
+ * @throws IOException if error reading commit metadata
+ */
+ public static String extractKafkaOffsetMetadata(final HoodieInstant
commitInstant,
+ final HoodieTimeline
timeline) throws IOException {
+ HoodieCommitMetadata commitMetadata =
TimelineUtils.getCommitMetadata(commitInstant, timeline);
+ return commitMetadata.getExtraMetadata().get(HOODIE_METADATA_KEY);
+ }
+
+ /**
+ * Parses Kafka offset string and restores partition offsets as Map.
+ *
+ * @param kafkaOffsetString URL-encoded Kafka offset string in format:
+ *
"kafka_metadata%3Atopic%3Apartition:offset;kafka_metadata%3Atopic%3Apartition:offset"
+ * @return Map of partition ID to offset, or empty map if parsing fails
+ */
+ public static Map<Integer, Long> parseKafkaOffsets(final String
kafkaOffsetString) {
+ Map<Integer, Long> partitionOffsets = new HashMap<>();
+
+ if (kafkaOffsetString == null || kafkaOffsetString.isEmpty()) {
+ return partitionOffsets;
+ }
+
+ try {
+ // Split by semicolon to get individual partition entries
+ String[] partitionEntries = kafkaOffsetString.split(PARTITION_SEPARATOR);
+
+ for (String entry : partitionEntries) {
+ entry = entry.trim();
+
+ // Skip cluster metadata entries (they don't contain partition:offset
format)
+ if (!entry.contains(":") || entry.contains("kafka_cluster")) {
+ continue;
+ }
+
+ // Entry format: kafka_metadata%3Atopic%3Apartition:offset
+ // Find the last colon which separates partition and offset
+ int lastColonIndex = entry.lastIndexOf(':');
+ if (lastColonIndex == -1) {
+ continue;
+ }
+
+ String offsetStr = entry.substring(lastColonIndex + 1);
+ String beforeOffset = entry.substring(0, lastColonIndex);
+
+ // Find the partition number (everything after the last %3A before the
colon)
+ int lastEncodedColonIndex =
beforeOffset.lastIndexOf(URL_ENCODED_COLON);
+ if (lastEncodedColonIndex == -1) {
+ continue;
+ }
+
+ String partitionStr = beforeOffset.substring(lastEncodedColonIndex +
URL_ENCODED_COLON.length());
+
+ try {
+ int partitionId = Integer.parseInt(partitionStr);
+ long offset = Long.parseLong(offsetStr);
+ partitionOffsets.put(partitionId, offset);
+ } catch (NumberFormatException e) {
+ log.warn("Failed to parse partition ID or offset from entry: {}",
entry, e);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to parse Kafka offset string: {}", kafkaOffsetString,
e);
Review Comment:
You are right — changed to throw `HoodieException` instead of silently
swallowing the error. The individual per-entry `NumberFormatException` still
logs a warning and skips (for resilient partial parsing of individual malformed
entries), but any unexpected exception during overall parsing now propagates to
the caller.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java:
##########
@@ -729,4 +733,153 @@ public static void validateWriteStatus(
});
}
}
+
+ /**
+ * Extracts Kafka offset metadata from a Hudi commit.
+ *
+ * @param commitInstant The commit instant to extract metadata from
+ * @param timeline The timeline to read commit details from
+ * @return The Kafka offset metadata string, or null if not found
+ * @throws IOException if error reading commit metadata
+ */
+ public static String extractKafkaOffsetMetadata(final HoodieInstant
commitInstant,
+ final HoodieTimeline
timeline) throws IOException {
+ HoodieCommitMetadata commitMetadata =
TimelineUtils.getCommitMetadata(commitInstant, timeline);
+ return commitMetadata.getExtraMetadata().get(HOODIE_METADATA_KEY);
+ }
+
+ /**
+ * Parses Kafka offset string and restores partition offsets as Map.
+ *
+ * @param kafkaOffsetString URL-encoded Kafka offset string in format:
+ *
"kafka_metadata%3Atopic%3Apartition:offset;kafka_metadata%3Atopic%3Apartition:offset"
+ * @return Map of partition ID to offset, or empty map if parsing fails
+ */
+ public static Map<Integer, Long> parseKafkaOffsets(final String
kafkaOffsetString) {
+ Map<Integer, Long> partitionOffsets = new HashMap<>();
+
+ if (kafkaOffsetString == null || kafkaOffsetString.isEmpty()) {
+ return partitionOffsets;
+ }
+
+ try {
+ // Split by semicolon to get individual partition entries
+ String[] partitionEntries = kafkaOffsetString.split(PARTITION_SEPARATOR);
+
+ for (String entry : partitionEntries) {
+ entry = entry.trim();
+
+ // Skip cluster metadata entries (they don't contain partition:offset
format)
+ if (!entry.contains(":") || entry.contains("kafka_cluster")) {
+ continue;
+ }
+
+ // Entry format: kafka_metadata%3Atopic%3Apartition:offset
+ // Find the last colon which separates partition and offset
+ int lastColonIndex = entry.lastIndexOf(':');
+ if (lastColonIndex == -1) {
+ continue;
+ }
+
+ String offsetStr = entry.substring(lastColonIndex + 1);
+ String beforeOffset = entry.substring(0, lastColonIndex);
+
+ // Find the partition number (everything after the last %3A before the
colon)
+ int lastEncodedColonIndex =
beforeOffset.lastIndexOf(URL_ENCODED_COLON);
+ if (lastEncodedColonIndex == -1) {
+ continue;
+ }
+
+ String partitionStr = beforeOffset.substring(lastEncodedColonIndex +
URL_ENCODED_COLON.length());
+
+ try {
+ int partitionId = Integer.parseInt(partitionStr);
+ long offset = Long.parseLong(offsetStr);
+ partitionOffsets.put(partitionId, offset);
+ } catch (NumberFormatException e) {
+ log.warn("Failed to parse partition ID or offset from entry: {}",
entry, e);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to parse Kafka offset string: {}", kafkaOffsetString,
e);
+ }
+
+ return partitionOffsets;
+ }
+
+ /**
+ * Calculates the total difference count between Kafka offsets of two Hudi
commits.
+ * The difference is: current commit's kafka offset minus previous commit's
kafka offset
+ * for each partition. All partition differences are summed together.
+ *
+ * @param currentCommitInstant The current (newer) commit instant
+ * @param previousCommitInstant The previous (older) commit instant
+ * @param timeline The timeline to read commit details from
+ * @return The total count of offset differences across all partitions, or 0
if calculation fails
+ */
+ public static long calculateKafkaOffsetDifference(
+ final HoodieInstant currentCommitInstant,
+ final HoodieInstant previousCommitInstant,
+ final HoodieTimeline timeline) {
+
+ if (currentCommitInstant == null || previousCommitInstant == null ||
timeline == null) {
+ log.warn("Invalid input parameters for calculating Kafka offset
difference");
+ return 0L;
+ }
+
+ try {
+ // Extract Kafka offset metadata from both commits
+ String currentOffsetString =
extractKafkaOffsetMetadata(currentCommitInstant, timeline);
+ String previousOffsetString =
extractKafkaOffsetMetadata(previousCommitInstant, timeline);
+
+ if (currentOffsetString == null || previousOffsetString == null) {
+ log.warn("Kafka offset metadata not found in one or both commits:
current={}, previous={}",
+ currentCommitInstant.requestedTime(),
previousCommitInstant.requestedTime());
+ return 0L;
+ }
+
+ // Parse offset strings into partition -> offset maps
+ Map<Integer, Long> currentOffsets =
parseKafkaOffsets(currentOffsetString);
+ Map<Integer, Long> previousOffsets =
parseKafkaOffsets(previousOffsetString);
+
+ if (currentOffsets.isEmpty() || previousOffsets.isEmpty()) {
+ log.warn("Failed to parse Kafka offsets from commits: current={},
previous={}",
+ currentCommitInstant.requestedTime(),
previousCommitInstant.requestedTime());
+ return 0L;
+ }
+
+ // Calculate total difference across all partitions
+ long totalDifference = 0L;
+
+ for (Map.Entry<Integer, Long> currentEntry : currentOffsets.entrySet()) {
+ Integer partitionId = currentEntry.getKey();
+ Long currentOffset = currentEntry.getValue();
+ Long previousOffset = previousOffsets.get(partitionId);
+
+ // If partition doesn't exist in previous commit (new partition), use
0 as base
+ if (previousOffset == null) {
+ previousOffset = 0L;
+ log.debug("New partition {} detected, using 0 as previous offset
baseline",
+ partitionId);
+ }
+
+ long difference = currentOffset - previousOffset;
+ totalDifference += difference;
+
+ log.debug("Partition {} offset difference: {} - {} = {}",
+ partitionId, currentOffset, previousOffset, difference);
+ }
+
+ log.info("Total Kafka offset difference between commits {} and {}: {}",
+ currentCommitInstant.requestedTime(),
previousCommitInstant.requestedTime(),
+ totalDifference);
+
+ return totalDifference;
+
+ } catch (Exception e) {
+ log.error("Failed to calculate Kafka offset difference between commits
{} and {}",
+ currentCommitInstant.requestedTime(),
previousCommitInstant.requestedTime(), e);
+ return 0L;
Review Comment:
Agreed — returning 0 silently masks real failures. Changed all error paths
to throw:
- Null parameters → `IllegalArgumentException`
- Missing metadata → `HoodieException`
- Empty parsed offsets → `HoodieException`
- `IOException` from metadata extraction now propagates directly
Updated tests to assert exceptions for these cases instead of expecting 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]