This is an automated email from the ASF dual-hosted git repository.
pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e1ae9c6ac2fe feat(flink): collect event time in
HoodieRowDataCreateHandle for min/max event time metrics (#18250)
e1ae9c6ac2fe is described below
commit e1ae9c6ac2fef5c665ee01097b1cb403cfb44cae
Author: Jianchun Xu <[email protected]>
AuthorDate: Fri Feb 27 10:25:49 2026 -0800
feat(flink): collect event time in HoodieRowDataCreateHandle for min/max
event time metrics (#18250)
* feat(flink): collect event time in HoodieRowDataCreateHandle for min/max
event time metrics
- Add event time field index from hoodie.payload.event.time.field (DOUBLE,
epoch seconds)
- Extract event time in write() when !skipMetadataWrite and pass to
WriteStatus.markSuccess
- Enables Flink ingestion min/max event time in commit stats for latency
metrics
- Add TestHoodieRowDataCreateHandle for event time field index and
extraction
Made-with: Cursor
* feat(flink): support DOUBLE and BIGINT event time in
HoodieRowDataCreateHandle
- Use RowData.FieldGetter for event time field
(hoodie.payload.event.time.field)
- Support both DOUBLE and BIGINT; getter is null when field not configured
or unsupported type
- Extract value as long then string (no unit interpretation; WriteStatus
infers by length)
- Add defensive try-catch in extractEventTimeMetadata for schema mismatch
edge cases
- Add testEventTimeExtractionWithBigintMillis for BIGINT (millis) coverage
Made-with: Cursor
---
.../io/storage/row/HoodieRowDataCreateHandle.java | 61 ++-
.../storage/row/TestHoodieRowDataCreateHandle.java | 439 +++++++++++++++++++++
2 files changed, 499 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index eb4954cc6b3b..45333cf4b5db 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.model.HoodieRowData;
import org.apache.hudi.client.model.HoodieRowDataCreation;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -42,13 +43,17 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
/**
@@ -78,6 +83,9 @@ public class HoodieRowDataCreateHandle implements
Serializable {
private final HoodieTimer currTimer;
+ // Event time tracking for min/max event time metrics (when
hoodie.payload.event.time.field is set)
+ private final RowData.FieldGetter eventTimeFieldGetter;
+
public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig
writeConfig, String partitionPath, String fileId,
String instantTime, int taskPartitionId,
long taskId, long taskEpochId,
RowType rowType, boolean
preserveHoodieMetadata, boolean skipMetadataWrite) {
@@ -95,6 +103,7 @@ public class HoodieRowDataCreateHandle implements
Serializable {
this.currTimer = HoodieTimer.start();
this.storage = table.getStorage();
this.path = makeNewPath(partitionPath);
+ this.eventTimeFieldGetter = initEventTimeFieldGetter(writeConfig, rowType);
this.writeStatus = new WriteStatus(table.shouldTrackSuccessRecords(),
writeConfig.getWriteStatusFailureFraction());
@@ -145,11 +154,15 @@ public class HoodieRowDataCreateHandle implements
Serializable {
} else {
rowData = record;
}
+ // Extract event time metadata when metadata is written (rowData matches
rowType with event time field)
+ Option<Map<String, String>> recordMetadata = !skipMetadataWrite
+ ? extractEventTimeMetadata(rowData)
+ : Option.empty();
try {
fileWriter.writeRow(recordKey, rowData);
HoodieRecordDelegate recordDelegate =
writeStatus.isTrackingSuccessfulWrites()
? HoodieRecordDelegate.create(recordKey, partitionPath, null,
newRecordLocation) : null;
- writeStatus.markSuccess(recordDelegate, Option.empty());
+ writeStatus.markSuccess(recordDelegate, recordMetadata);
} catch (Throwable t) {
log.error("Error writing record " + record, t);
if (!writeConfig.getIgnoreWriteFailed()) {
@@ -163,6 +176,52 @@ public class HoodieRowDataCreateHandle implements
Serializable {
}
}
+ /**
+ * Initializes event time field getter from config
(hoodie.payload.event.time.field).
+ * Supports DOUBLE and BIGINT only. Returns null if not configured, not
found, or unsupported type.
+ */
+ private static RowData.FieldGetter
initEventTimeFieldGetter(HoodieWriteConfig writeConfig, RowType rowType) {
+ String eventTimeField =
writeConfig.getPayloadConfig().getProps().getProperty(
+ HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY);
+ if (eventTimeField == null || eventTimeField.isEmpty()) {
+ return null;
+ }
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (rowType.getFieldNames().get(i).equals(eventTimeField)) {
+ RowType.RowField field = rowType.getFields().get(i);
+ LogicalTypeRoot root = field.getType().getTypeRoot();
+ if (root == LogicalTypeRoot.DOUBLE || root == LogicalTypeRoot.BIGINT) {
+ return RowData.createFieldGetter(field.getType(), i);
+ }
+ log.warn("Event time field '{}' has unsupported type {}. Only DOUBLE
and BIGINT are supported. "
+ + "Event time metrics will be unavailable.", eventTimeField, root);
+ return null;
+ }
+ }
+ log.warn("Event time field '{}' configured but not found in schema. Event
time metrics will be unavailable.",
+ eventTimeField);
+ return null;
+ }
+
+ /**
+ * Extracts event time from rowData using the event time field getter.
+ * Value is converted to long then string (no unit interpretation);
WriteStatus infers unit by string length.
+ */
+ private Option<Map<String, String>> extractEventTimeMetadata(RowData
rowData) {
+ if (eventTimeFieldGetter == null) {
+ return Option.empty();
+ }
+ try {
+ Object val = eventTimeFieldGetter.getFieldOrNull(rowData);
+ if (val instanceof Number) {
+ return Option.of(Collections.singletonMap(METADATA_EVENT_TIME_KEY,
String.valueOf(((Number) val).longValue())));
+ }
+ } catch (Exception e) {
+ log.debug("Failed to extract event time", e);
+ }
+ return Option.empty();
+ }
+
/**
* Returns {@code true} if this handle can take in more writes. else {@code
false}.
*/
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
new file mode 100644
index 000000000000..97e6087a1120
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowDataCreateHandle.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieRowDataCreateHandle}.
+ */
+public class TestHoodieRowDataCreateHandle extends
HoodieFlinkClientTestHarness {
+
+ private static final String PARTITION_PATH = "2024/10/21";
+ private static final String FILE_ID = "file-1";
+ private static final String INSTANT_TIME = "20241021120000";
+ private static final int TASK_PARTITION_ID = 0;
+ private static final long TASK_ID = 1;
+ private static final long TASK_EPOCH_ID = 1;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ initPath();
+ initFileSystem();
+ initMetaClient();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ /**
+ * Adds Hoodie metadata fields to the row type, matching production behavior
in BulkInsertWriterHelper.
+ */
+ private static RowType addMetadataFields(RowType rowType, boolean
withOperationField) {
+ List<RowType.RowField> mergedFields = new ArrayList<>();
+
+ LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
+ mergedFields.add(new
RowType.RowField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, metadataFieldType));
+ mergedFields.add(new
RowType.RowField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, metadataFieldType));
+ mergedFields.add(new
RowType.RowField(HoodieRecord.RECORD_KEY_METADATA_FIELD, metadataFieldType));
+ mergedFields.add(new
RowType.RowField(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
metadataFieldType));
+ mergedFields.add(new
RowType.RowField(HoodieRecord.FILENAME_METADATA_FIELD, metadataFieldType));
+
+ if (withOperationField) {
+ mergedFields.add(new
RowType.RowField(HoodieRecord.OPERATION_METADATA_FIELD, metadataFieldType));
+ }
+
+ mergedFields.addAll(rowType.getFields());
+ return new RowType(mergedFields);
+ }
+
+ @Test
+ public void testEventTimeFieldIndexWithDoubleType() throws Exception {
+ // Schema with DOUBLE event_time field
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("event_time", DataTypes.DOUBLE()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ // Add metadata fields like production code does
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ Properties props = new Properties();
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ assertNotNull(handle);
+ // Verify the handle was created successfully with event time configured
+ }
+
+ @Test
+ public void testEventTimeFieldIndexWithWrongType() throws Exception {
+ // Schema with STRING event_time field (not DOUBLE)
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("event_time", DataTypes.VARCHAR(20)), // STRING, not
DOUBLE
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ Properties props = new Properties();
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ // Should create handle but log warning about unsupported type
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ assertNotNull(handle);
+ }
+
+ @Test
+ public void testEventTimeFieldIndexWithMissingField() throws Exception {
+ // Schema without event_time field
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ Properties props = new Properties();
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ // Should create handle but log warning about missing field
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ assertNotNull(handle);
+ }
+
+ @Test
+ public void testEventTimeFieldIndexWithNoConfiguration() throws Exception {
+ // Schema with a DOUBLE field but no event_time configured
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withEmbeddedTimelineServerEnabled(false)
+ // No event time field configured
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ assertNotNull(handle);
+ }
+
+ @Test
+ public void testEventTimeExtractionWithValidDoubleField() throws Exception {
+ // Schema with DOUBLE event_time field
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("event_time", DataTypes.DOUBLE()), // epoch seconds
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ Properties props = new Properties();
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ // Create test records with event time
+ double eventTimeSeconds1 = 1729512000.0; // 2024-10-21 12:00:00
+ double eventTimeSeconds2 = 1729515600.0; // 2024-10-21 13:00:00
+
+ GenericRowData row1 = new GenericRowData(5);
+ row1.setField(0, StringData.fromString("id1"));
+ row1.setField(1, StringData.fromString("Alice"));
+ row1.setField(2, 25);
+ row1.setField(3, eventTimeSeconds1);
+ row1.setField(4, StringData.fromString(PARTITION_PATH));
+
+ GenericRowData row2 = new GenericRowData(5);
+ row2.setField(0, StringData.fromString("id2"));
+ row2.setField(1, StringData.fromString("Bob"));
+ row2.setField(2, 30);
+ row2.setField(3, eventTimeSeconds2);
+ row2.setField(4, StringData.fromString(PARTITION_PATH));
+
+ // Write records
+ handle.write("id1", PARTITION_PATH, row1);
+ handle.write("id2", PARTITION_PATH, row2);
+
+ // Get write status and verify event time
+ WriteStatus writeStatus = handle.close();
+ assertNotNull(writeStatus);
+ assertNotNull(writeStatus.getStat());
+
+ // Verify min/max event times are populated
+ Long minEventTime = writeStatus.getStat().getMinEventTime();
+ Long maxEventTime = writeStatus.getStat().getMaxEventTime();
+
+ assertNotNull(minEventTime, "Min event time should be populated");
+ assertNotNull(maxEventTime, "Max event time should be populated");
+
+ // Verify the values are correct (converted from seconds to millis)
+ assertEquals((long) (eventTimeSeconds1 * 1000), minEventTime.longValue(),
+ "Min event time should match first record");
+ assertEquals((long) (eventTimeSeconds2 * 1000), maxEventTime.longValue(),
+ "Max event time should match second record");
+
+ // Verify min <= max
+ assertTrue(minEventTime <= maxEventTime, "Min event time should be <= max
event time");
+ }
+
+ @Test
+ public void testEventTimeExtractionWithBigintMillis() throws Exception {
+ // Schema with BIGINT event_time field (millis)
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("event_time", DataTypes.BIGINT()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ Properties props = new Properties();
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ long eventTimeMillis1 = 1729512000000L;
+ long eventTimeMillis2 = 1729515600000L;
+
+ GenericRowData row1 = new GenericRowData(5);
+ row1.setField(0, StringData.fromString("id1"));
+ row1.setField(1, StringData.fromString("Alice"));
+ row1.setField(2, 25);
+ row1.setField(3, eventTimeMillis1);
+ row1.setField(4, StringData.fromString(PARTITION_PATH));
+
+ GenericRowData row2 = new GenericRowData(5);
+ row2.setField(0, StringData.fromString("id2"));
+ row2.setField(1, StringData.fromString("Bob"));
+ row2.setField(2, 30);
+ row2.setField(3, eventTimeMillis2);
+ row2.setField(4, StringData.fromString(PARTITION_PATH));
+
+ handle.write("id1", PARTITION_PATH, row1);
+ handle.write("id2", PARTITION_PATH, row2);
+
+ WriteStatus writeStatus = handle.close();
+ assertNotNull(writeStatus);
+ Long minEventTime = writeStatus.getStat().getMinEventTime();
+ Long maxEventTime = writeStatus.getStat().getMaxEventTime();
+ assertNotNull(minEventTime);
+ assertNotNull(maxEventTime);
+ assertEquals(eventTimeMillis1, minEventTime.longValue());
+ assertEquals(eventTimeMillis2, maxEventTime.longValue());
+ }
+
+ @Test
+ public void testEventTimeExtractionWithNullValues() throws Exception {
+ // Schema with DOUBLE event_time field
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("event_time", DataTypes.DOUBLE()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ Properties props = new Properties();
+ props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"event_time");
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ // Create record with null event time
+ GenericRowData row = new GenericRowData(5);
+ row.setField(0, StringData.fromString("id1"));
+ row.setField(1, StringData.fromString("Alice"));
+ row.setField(2, 25);
+ row.setField(3, null); // null event time
+ row.setField(4, StringData.fromString(PARTITION_PATH));
+
+ // Write record
+ handle.write("id1", PARTITION_PATH, row);
+
+ // Get write status
+ WriteStatus writeStatus = handle.close();
+ assertNotNull(writeStatus);
+ assertNotNull(writeStatus.getStat());
+
+ // Verify event times are null when input is null
+ assertNull(writeStatus.getStat().getMinEventTime(),
+ "Min event time should be null when input is null");
+ assertNull(writeStatus.getStat().getMaxEventTime(),
+ "Max event time should be null when input is null");
+ }
+
+ @Test
+ public void testEventTimeExtractionWithoutConfiguration() throws Exception {
+ // Schema with DOUBLE field but no event_time configured
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("name", DataTypes.VARCHAR(20)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(20))
+ ).notNull();
+ RowType baseRowType = (RowType) dataType.getLogicalType();
+ RowType rowTypeWithMetadata = addMetadataFields(baseRowType, false);
+
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withEmbeddedTimelineServerEnabled(false)
+ // No event time field configured
+ .build();
+
+ HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, context,
metaClient);
+
+ HoodieRowDataCreateHandle handle = new HoodieRowDataCreateHandle(
+ table, config, PARTITION_PATH, FILE_ID, INSTANT_TIME,
+ TASK_PARTITION_ID, TASK_ID, TASK_EPOCH_ID, rowTypeWithMetadata, false,
false);
+
+ // Create record with timestamp
+ GenericRowData row = new GenericRowData(5);
+ row.setField(0, StringData.fromString("id1"));
+ row.setField(1, StringData.fromString("Alice"));
+ row.setField(2, 25);
+ row.setField(3, 1729512000.0);
+ row.setField(4, StringData.fromString(PARTITION_PATH));
+
+ // Write record
+ handle.write("id1", PARTITION_PATH, row);
+
+ // Get write status
+ WriteStatus writeStatus = handle.close();
+ assertNotNull(writeStatus);
+ assertNotNull(writeStatus.getStat());
+
+ // Verify event times are null when not configured
+ assertNull(writeStatus.getStat().getMinEventTime(),
+ "Min event time should be null when not configured");
+ assertNull(writeStatus.getStat().getMaxEventTime(),
+ "Max event time should be null when not configured");
+ }
+}