This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3abe066f3ad5 feat(common): per-partition event-time rollup and
decouple watermark tracking from EVENT_TIME_ORDERING (#18778)
3abe066f3ad5 is described below
commit 3abe066f3ad570af53e5503a79c6e444deaec346
Author: Xinli Shang <[email protected]>
AuthorDate: Tue May 19 20:45:55 2026 -0700
feat(common): per-partition event-time rollup and decouple watermark
tracking from EVENT_TIME_ORDERING (#18778)
Expose the per-partition event-time rollup that is already latent on disk
and stop gating watermark tracking on EVENT_TIME_ORDERING so freshness
observability works for COW / COMMIT_TIME_ORDERING tables.
Changes:
- HoodieCommitMetadata.getMinAndMaxEventTimePerPartition(): pure aggregation
over partitionToWriteStats returning Map<String, Pair<Option<Long>,
Option<Long>>>.
No persisted bytes, no avro schema change. Partitions whose stats carry no
event time are omitted.
- HoodieWriteHandle: drop the recordMergeMode == EVENT_TIME_ORDERING check
from isTrackingEventTimeWatermark. Tracking now activates whenever the
event-time field is configured and hoodie.write.track.event.time.watermark
is true, independent of merge mode.
- Tests: 5 new TestHoodieCommitMetadata cases for the rollup API; update
TestHoodieWriteHandle to assert the new merge-mode-independent behavior
and add a missing-event-time-field negative case.
Part of #17512 (Phase 1 of the reconcile plan). No behavior change for
tables that have not opted into hoodie.write.track.event.time.watermark.
Co-authored-by: Xinli Shang <[email protected]>
---
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 5 +-
.../org/apache/hudi/io/TestHoodieWriteHandle.java | 13 +++-
.../hudi/common/model/HoodieCommitMetadata.java | 36 +++++++++
.../common/model/TestHoodieCommitMetadata.java | 85 ++++++++++++++++++++++
4 files changed, 135 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index cd22186aa8e0..b1d8e4305971 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -66,7 +66,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
@@ -140,9 +139,11 @@ public abstract class HoodieWriteHandle<T, I, K, O>
extends HoodieIOHandle<T, I,
}
// For tracking event time watermark.
+ // Watermark tracking is independent of the record merge mode: pure
freshness
+ // observability is orthogonal to merge semantics, so COW /
COMMIT_TIME_ORDERING
+ // tables that opt in still populate per-write-stat min/max event time.
this.eventTimeFieldName =
ConfigUtils.getEventTimeFieldName(config.getProps());
this.isTrackingEventTimeWatermark = this.eventTimeFieldName != null
- && hoodieTable.getMetaClient().getTableConfig().getRecordMergeMode()
== EVENT_TIME_ORDERING
&& ConfigUtils.isTrackingEventTimeWatermark(config.getProps());
this.keepConsistentLogicalTimestamp = isTrackingEventTimeWatermark &&
ConfigUtils.shouldKeepConsistentLogicalTimestamp(config.getProps());
TypedProperties mergeProps = ConfigUtils.getMergeProps(config.getProps(),
hoodieTable.getMetaClient().getTableConfig());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
index b37f5989e57c..34d9023f57a7 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
@@ -122,10 +122,19 @@ class TestHoodieWriteHandle {
@Test
void
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithCommitTimeOrdering() {
- // Setup: AVRO record type but with commit time ordering
+ // Watermark tracking is independent of merge mode: COMMIT_TIME_ORDERING +
event time field
+ // + config enabled should still track, so freshness observability works
for COW tables.
+ boolean result = mockWriteHandle(true, "ts", false,
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.COMMIT_TIME_ORDERING)
+ .isTrackingEventTimeWaterMarker();
+ assertTrue(result, "Should track event time watermark when commit time
ordering is used but config + event time field are set");
+ }
+
+ @Test
+ void testShouldNotTrackEventTimeWaterMarkerWhenEventTimeFieldMissing() {
+ // No event time field configured -> no watermark, regardless of merge
mode.
boolean result = mockWriteHandle(true, null, false,
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.COMMIT_TIME_ORDERING)
.isTrackingEventTimeWaterMarker();
- assertFalse(result, "Should not track event time watermark when using
commit time ordering");
+ assertFalse(result, "Should not track event time watermark when event time
field is not configured");
}
@Test
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 51171b83b0e6..ac76a3b396f3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -497,6 +497,42 @@ public class HoodieCommitMetadata implements Serializable {
maxEventTime == Long.MIN_VALUE ? Option.empty() :
Option.of(maxEventTime));
}
+ /**
+ * Returns per-partition min/max event time folded across this commit's
write stats.
+ *
+ * <p>Each entry corresponds to a partition for which at least one write
stat carried a
+ * non-null {@link HoodieWriteStat#getMinEventTime()} or {@link
HoodieWriteStat#getMaxEventTime()}.
+ * Partitions whose write stats have no event-time information at all are
omitted from the
+ * returned map. The min/max within a partition are folded with {@code
Math.min} / {@code Math.max}
+ * over its write stats, mirroring the semantics of {@link
#getMinAndMaxEventTime()}.
+ *
+ * <p>This is a pure aggregation over {@code partitionToWriteStats} — it
adds no persisted
+ * bytes and does not change the commit avro schema.
+ */
+ public Map<String, Pair<Option<Long>, Option<Long>>>
getMinAndMaxEventTimePerPartition() {
+ Map<String, Pair<Option<Long>, Option<Long>>> result = new HashMap<>();
+ for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {
+ long minEventTime = Long.MAX_VALUE;
+ long maxEventTime = Long.MIN_VALUE;
+ for (HoodieWriteStat writeStat : entry.getValue()) {
+ if (writeStat.getMinEventTime() != null) {
+ minEventTime = Math.min(writeStat.getMinEventTime(), minEventTime);
+ }
+ if (writeStat.getMaxEventTime() != null) {
+ maxEventTime = Math.max(writeStat.getMaxEventTime(), maxEventTime);
+ }
+ }
+ if (minEventTime != Long.MAX_VALUE || maxEventTime != Long.MIN_VALUE) {
+ result.put(
+ entry.getKey(),
+ Pair.of(
+ minEventTime == Long.MAX_VALUE ? Option.empty() :
Option.of(minEventTime),
+ maxEventTime == Long.MIN_VALUE ? Option.empty() :
Option.of(maxEventTime)));
+ }
+ }
+ return result;
+ }
+
public HashSet<String> getWritePartitionPaths() {
return new HashSet<>(partitionToWriteStats.keySet());
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
index 0c4781427f8d..293a5b093bc4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
@@ -378,4 +378,89 @@ public class TestHoodieCommitMetadata {
writeStat.setNumDeletes(0);
return writeStat;
}
+
+ @Test
+ public void
testGetMinAndMaxEventTimePerPartition_FoldsAcrossStatsWithinPartition() {
+ // Two stats in the same partition: per-partition rollup folds min/max
across them.
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addWriteStat("partition1",
createWriteStatWithEventTime("partition1", "f1", 100L, 200L));
+ commitMetadata.addWriteStat("partition1",
createWriteStatWithEventTime("partition1", "f2", 50L, 300L));
+ commitMetadata.addWriteStat("partition2",
createWriteStatWithEventTime("partition2", "f3", 1000L, 2000L));
+
+ Map<String, Pair<Option<Long>, Option<Long>>> result =
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+ assertEquals(2, result.size());
+ assertEquals(Option.of(50L), result.get("partition1").getLeft());
+ assertEquals(Option.of(300L), result.get("partition1").getRight());
+ assertEquals(Option.of(1000L), result.get("partition2").getLeft());
+ assertEquals(Option.of(2000L), result.get("partition2").getRight());
+ }
+
+ @Test
+ public void
testGetMinAndMaxEventTimePerPartition_OmitsPartitionsWithoutEventTime() {
+ // A partition whose stats carry no event time should not appear in the
result map.
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addWriteStat("partition1",
createWriteStatWithEventTime("partition1", "f1", 100L, 200L));
+ commitMetadata.addWriteStat("partition2",
createWriteStatWithEventTime("partition2", "f2", null, null));
+
+ Map<String, Pair<Option<Long>, Option<Long>>> result =
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+ assertEquals(1, result.size());
+ assertTrue(result.containsKey("partition1"));
+ assertEquals(Option.of(100L), result.get("partition1").getLeft());
+ assertEquals(Option.of(200L), result.get("partition1").getRight());
+ }
+
+ @Test
+ public void testGetMinAndMaxEventTimePerPartition_HandlesPartialEventTime() {
+ // Stats may carry only one of min/max (e.g. when folding across writers);
the missing
+ // side should surface as Option.empty() in the rollup, the present side
as Option.of(...).
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addWriteStat("partition1",
createWriteStatWithEventTime("partition1", "f1", 100L, null));
+ commitMetadata.addWriteStat("partition1",
createWriteStatWithEventTime("partition1", "f2", null, 500L));
+
+ Map<String, Pair<Option<Long>, Option<Long>>> result =
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+ assertEquals(1, result.size());
+ assertEquals(Option.of(100L), result.get("partition1").getLeft());
+ assertEquals(Option.of(500L), result.get("partition1").getRight());
+ }
+
+ @Test
+ public void testGetMinAndMaxEventTimePerPartition_EmptyMetadata() {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ assertTrue(commitMetadata.getMinAndMaxEventTimePerPartition().isEmpty());
+ }
+
+ @Test
+ public void testGetMinAndMaxEventTimePerPartition_MatchesGlobalRollup() {
+ // Sanity: collapsing the per-partition rollup yields the same min/max as
the global getter.
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addWriteStat("partition1",
createWriteStatWithEventTime("partition1", "f1", 100L, 200L));
+ commitMetadata.addWriteStat("partition2",
createWriteStatWithEventTime("partition2", "f2", 50L, 300L));
+ commitMetadata.addWriteStat("partition3",
createWriteStatWithEventTime("partition3", "f3", 75L, 150L));
+
+ Pair<Option<Long>, Option<Long>> global =
commitMetadata.getMinAndMaxEventTime();
+ Map<String, Pair<Option<Long>, Option<Long>>> perPartition =
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+ long minAcrossPartitions = perPartition.values().stream()
+ .mapToLong(p -> p.getLeft().orElse(Long.MAX_VALUE))
+ .min().getAsLong();
+ long maxAcrossPartitions = perPartition.values().stream()
+ .mapToLong(p -> p.getRight().orElse(Long.MIN_VALUE))
+ .max().getAsLong();
+
+ assertEquals(global.getLeft(), Option.of(minAcrossPartitions));
+ assertEquals(global.getRight(), Option.of(maxAcrossPartitions));
+ }
+
+ private HoodieWriteStat createWriteStatWithEventTime(String partitionPath,
String fileId, Long minEventTime, Long maxEventTime) {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setFileId(fileId);
+ writeStat.setPath(partitionPath + "/" + fileId + ".parquet");
+ writeStat.setMinEventTime(minEventTime);
+ writeStat.setMaxEventTime(maxEventTime);
+ return writeStat;
+ }
}