This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3abe066f3ad5 feat(common): per-partition event-time rollup and 
decouple watermark tracking from EVENT_TIME_ORDERING (#18778)
3abe066f3ad5 is described below

commit 3abe066f3ad570af53e5503a79c6e444deaec346
Author: Xinli Shang <[email protected]>
AuthorDate: Tue May 19 20:45:55 2026 -0700

    feat(common): per-partition event-time rollup and decouple watermark 
tracking from EVENT_TIME_ORDERING (#18778)
    
    Expose the per-partition event-time rollup that is already latent on disk
    and stop gating watermark tracking on EVENT_TIME_ORDERING so freshness
    observability works for COW / COMMIT_TIME_ORDERING tables.
    
    Changes:
    - HoodieCommitMetadata.getMinAndMaxEventTimePerPartition(): pure aggregation
      over partitionToWriteStats returning Map<String, Pair<Option<Long>, 
Option<Long>>>.
      No persisted bytes, no avro schema change. Partitions whose stats carry no
      event time are omitted.
    - HoodieWriteHandle: drop the recordMergeMode == EVENT_TIME_ORDERING check
      from isTrackingEventTimeWatermark. Tracking now activates whenever the
      event-time field is configured and hoodie.write.track.event.time.watermark
      is true, independent of merge mode.
    - Tests: 5 new TestHoodieCommitMetadata cases for the rollup API; update
      TestHoodieWriteHandle to assert the new merge-mode-independent behavior
      and add a missing-event-time-field negative case.
    
    Part of #17512 (Phase 1 of the reconcile plan). No behavior change for
    tables that have not opted into hoodie.write.track.event.time.watermark.
    
    Co-authored-by: Xinli Shang <[email protected]>
---
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  5 +-
 .../org/apache/hudi/io/TestHoodieWriteHandle.java  | 13 +++-
 .../hudi/common/model/HoodieCommitMetadata.java    | 36 +++++++++
 .../common/model/TestHoodieCommitMetadata.java     | 85 ++++++++++++++++++++++
 4 files changed, 135 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index cd22186aa8e0..b1d8e4305971 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -66,7 +66,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 
@@ -140,9 +139,11 @@ public abstract class HoodieWriteHandle<T, I, K, O> 
extends HoodieIOHandle<T, I,
     }
 
     // For tracking event time watermark.
+    // Watermark tracking is independent of the record merge mode: pure 
freshness
+    // observability is orthogonal to merge semantics, so COW / 
COMMIT_TIME_ORDERING
+    // tables that opt in still populate per-write-stat min/max event time.
     this.eventTimeFieldName = 
ConfigUtils.getEventTimeFieldName(config.getProps());
     this.isTrackingEventTimeWatermark = this.eventTimeFieldName != null
-        && hoodieTable.getMetaClient().getTableConfig().getRecordMergeMode() 
== EVENT_TIME_ORDERING
         && ConfigUtils.isTrackingEventTimeWatermark(config.getProps());
     this.keepConsistentLogicalTimestamp = isTrackingEventTimeWatermark && 
ConfigUtils.shouldKeepConsistentLogicalTimestamp(config.getProps());
     TypedProperties mergeProps = ConfigUtils.getMergeProps(config.getProps(), 
hoodieTable.getMetaClient().getTableConfig());
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
index b37f5989e57c..34d9023f57a7 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/TestHoodieWriteHandle.java
@@ -122,10 +122,19 @@ class TestHoodieWriteHandle {
 
   @Test
   void 
testShouldTrackEventTimeWaterMarkerAvroRecordTypeWithCommitTimeOrdering() {
-    // Setup: AVRO record type but with commit time ordering
+    // Watermark tracking is independent of merge mode: COMMIT_TIME_ORDERING + 
event time field
+    // + config enabled should still track, so freshness observability works 
for COW tables.
+    boolean result = mockWriteHandle(true, "ts", false, 
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.COMMIT_TIME_ORDERING)
+        .isTrackingEventTimeWaterMarker();
+    assertTrue(result, "Should track event time watermark when commit time 
ordering is used but config + event time field are set");
+  }
+
+  @Test
+  void testShouldNotTrackEventTimeWaterMarkerWhenEventTimeFieldMissing() {
+    // No event time field configured -> no watermark, regardless of merge 
mode.
     boolean result = mockWriteHandle(true, null, false, 
HoodieRecord.HoodieRecordType.AVRO, RecordMergeMode.COMMIT_TIME_ORDERING)
         .isTrackingEventTimeWaterMarker();
-    assertFalse(result, "Should not track event time watermark when using 
commit time ordering");
+    assertFalse(result, "Should not track event time watermark when event time 
field is not configured");
   }
 
   @Test
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 51171b83b0e6..ac76a3b396f3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -497,6 +497,42 @@ public class HoodieCommitMetadata implements Serializable {
         maxEventTime == Long.MIN_VALUE ? Option.empty() : 
Option.of(maxEventTime));
   }
 
+  /**
+   * Returns per-partition min/max event time folded across this commit's 
write stats.
+   *
+   * <p>Each entry corresponds to a partition for which at least one write 
stat carried a
+   * non-null {@link HoodieWriteStat#getMinEventTime()} or {@link 
HoodieWriteStat#getMaxEventTime()}.
+   * Partitions whose write stats have no event-time information at all are 
omitted from the
+   * returned map. The min/max within a partition are folded with {@code 
Math.min} / {@code Math.max}
+   * over its write stats, mirroring the semantics of {@link 
#getMinAndMaxEventTime()}.
+   *
+   * <p>This is a pure aggregation over {@code partitionToWriteStats} — it 
adds no persisted
+   * bytes and does not change the commit avro schema.
+   */
+  public Map<String, Pair<Option<Long>, Option<Long>>> 
getMinAndMaxEventTimePerPartition() {
+    Map<String, Pair<Option<Long>, Option<Long>>> result = new HashMap<>();
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      long minEventTime = Long.MAX_VALUE;
+      long maxEventTime = Long.MIN_VALUE;
+      for (HoodieWriteStat writeStat : entry.getValue()) {
+        if (writeStat.getMinEventTime() != null) {
+          minEventTime = Math.min(writeStat.getMinEventTime(), minEventTime);
+        }
+        if (writeStat.getMaxEventTime() != null) {
+          maxEventTime = Math.max(writeStat.getMaxEventTime(), maxEventTime);
+        }
+      }
+      if (minEventTime != Long.MAX_VALUE || maxEventTime != Long.MIN_VALUE) {
+        result.put(
+            entry.getKey(),
+            Pair.of(
+                minEventTime == Long.MAX_VALUE ? Option.empty() : 
Option.of(minEventTime),
+                maxEventTime == Long.MIN_VALUE ? Option.empty() : 
Option.of(maxEventTime)));
+      }
+    }
+    return result;
+  }
+
   public HashSet<String> getWritePartitionPaths() {
     return new HashSet<>(partitionToWriteStats.keySet());
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
index 0c4781427f8d..293a5b093bc4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
@@ -378,4 +378,89 @@ public class TestHoodieCommitMetadata {
     writeStat.setNumDeletes(0);
     return writeStat;
   }
+
+  @Test
+  public void 
testGetMinAndMaxEventTimePerPartition_FoldsAcrossStatsWithinPartition() {
+    // Two stats in the same partition: per-partition rollup folds min/max 
across them.
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addWriteStat("partition1", 
createWriteStatWithEventTime("partition1", "f1", 100L, 200L));
+    commitMetadata.addWriteStat("partition1", 
createWriteStatWithEventTime("partition1", "f2", 50L, 300L));
+    commitMetadata.addWriteStat("partition2", 
createWriteStatWithEventTime("partition2", "f3", 1000L, 2000L));
+
+    Map<String, Pair<Option<Long>, Option<Long>>> result = 
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+    assertEquals(2, result.size());
+    assertEquals(Option.of(50L), result.get("partition1").getLeft());
+    assertEquals(Option.of(300L), result.get("partition1").getRight());
+    assertEquals(Option.of(1000L), result.get("partition2").getLeft());
+    assertEquals(Option.of(2000L), result.get("partition2").getRight());
+  }
+
+  @Test
+  public void 
testGetMinAndMaxEventTimePerPartition_OmitsPartitionsWithoutEventTime() {
+    // A partition whose stats carry no event time should not appear in the 
result map.
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addWriteStat("partition1", 
createWriteStatWithEventTime("partition1", "f1", 100L, 200L));
+    commitMetadata.addWriteStat("partition2", 
createWriteStatWithEventTime("partition2", "f2", null, null));
+
+    Map<String, Pair<Option<Long>, Option<Long>>> result = 
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+    assertEquals(1, result.size());
+    assertTrue(result.containsKey("partition1"));
+    assertEquals(Option.of(100L), result.get("partition1").getLeft());
+    assertEquals(Option.of(200L), result.get("partition1").getRight());
+  }
+
+  @Test
+  public void testGetMinAndMaxEventTimePerPartition_HandlesPartialEventTime() {
+    // Stats may carry only one of min/max (e.g. when folding across writers); 
the missing
+    // side should surface as Option.empty() in the rollup, the present side 
as Option.of(...).
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addWriteStat("partition1", 
createWriteStatWithEventTime("partition1", "f1", 100L, null));
+    commitMetadata.addWriteStat("partition1", 
createWriteStatWithEventTime("partition1", "f2", null, 500L));
+
+    Map<String, Pair<Option<Long>, Option<Long>>> result = 
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+    assertEquals(1, result.size());
+    assertEquals(Option.of(100L), result.get("partition1").getLeft());
+    assertEquals(Option.of(500L), result.get("partition1").getRight());
+  }
+
+  @Test
+  public void testGetMinAndMaxEventTimePerPartition_EmptyMetadata() {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    assertTrue(commitMetadata.getMinAndMaxEventTimePerPartition().isEmpty());
+  }
+
+  @Test
+  public void testGetMinAndMaxEventTimePerPartition_MatchesGlobalRollup() {
+    // Sanity: collapsing the per-partition rollup yields the same min/max as 
the global getter.
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addWriteStat("partition1", 
createWriteStatWithEventTime("partition1", "f1", 100L, 200L));
+    commitMetadata.addWriteStat("partition2", 
createWriteStatWithEventTime("partition2", "f2", 50L, 300L));
+    commitMetadata.addWriteStat("partition3", 
createWriteStatWithEventTime("partition3", "f3", 75L, 150L));
+
+    Pair<Option<Long>, Option<Long>> global = 
commitMetadata.getMinAndMaxEventTime();
+    Map<String, Pair<Option<Long>, Option<Long>>> perPartition = 
commitMetadata.getMinAndMaxEventTimePerPartition();
+
+    long minAcrossPartitions = perPartition.values().stream()
+        .mapToLong(p -> p.getLeft().orElse(Long.MAX_VALUE))
+        .min().getAsLong();
+    long maxAcrossPartitions = perPartition.values().stream()
+        .mapToLong(p -> p.getRight().orElse(Long.MIN_VALUE))
+        .max().getAsLong();
+
+    assertEquals(global.getLeft(), Option.of(minAcrossPartitions));
+    assertEquals(global.getRight(), Option.of(maxAcrossPartitions));
+  }
+
+  private HoodieWriteStat createWriteStatWithEventTime(String partitionPath, 
String fileId, Long minEventTime, Long maxEventTime) {
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setPartitionPath(partitionPath);
+    writeStat.setFileId(fileId);
+    writeStat.setPath(partitionPath + "/" + fileId + ".parquet");
+    writeStat.setMinEventTime(minEventTime);
+    writeStat.setMaxEventTime(maxEventTime);
+    return writeStat;
+  }
 }

Reply via email to