This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9fbf3b920d24e3e9f517beca10a2f4d892ae2d06
Author: jerryyue <jerry...@didiglobal.com>
AuthorDate: Fri Oct 28 19:16:46 2022 +0800

    [HUDI-5095] Flink: Stores a special watermark(flag) to identify the current 
progress of writing data
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  28 +++
 .../hudi/common/table/timeline/TimelineUtils.java  |   2 +-
 .../org/apache/hudi/common/util/DateTimeUtils.java |   8 +
 .../apache/hudi/configuration/FlinkOptions.java    |   7 +
 .../org/apache/hudi/sink/StreamWriteFunction.java  |   6 +
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  30 +++
 .../hudi/sink/append/AppendWriteFunction.java      |   2 +-
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |  10 +-
 .../sink/common/AbstractStreamWriteFunction.java   |  11 +-
 .../hudi/sink/common/AbstractWriteFunction.java    | 103 +++++++++
 .../hudi/sink/common/AbstractWriteOperator.java    |   9 +
 .../apache/hudi/sink/event/WriteMetadataEvent.java |  31 ++-
 .../java/org/apache/hudi/util/DataTypeUtils.java   | 141 +++++++++++++
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  35 ++++
 .../sink/TestWriteFunctionEventTimeExtract.java    | 232 +++++++++++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |   5 +-
 .../apache/hudi/sink/utils/TestDataTypeUtils.java  |  45 ++++
 .../hudi/utils/source/ContinuousFileSource.java    |   5 +
 18 files changed, 692 insertions(+), 18 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a352e86b96..735940eea4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -75,6 +75,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Deque;
 import java.util.LinkedList;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
@@ -657,6 +658,33 @@ public class HoodieAvroUtils {
     return fieldValue;
   }
 
+  public static Long getNestedFieldValAsLong(GenericRecord record, String 
fieldName,boolean consistentLogicalTimestampEnabled, Long defaultValue) {
+    GenericRecord valueNode = record;
+    Object fieldValue = valueNode.get(fieldName);
+    Schema fieldSchema = valueNode.getSchema().getField(fieldName).schema();
+    if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
+      return 
LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())).toEpochDay();
+    } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis() 
&& consistentLogicalTimestampEnabled) {
+      return new Timestamp(Long.parseLong(fieldValue.toString())).getTime();
+    } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros() 
&& consistentLogicalTimestampEnabled) {
+      return new Timestamp(Long.parseLong(fieldValue.toString()) / 
1000).getTime();
+    } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+      Decimal dc = (Decimal) fieldSchema.getLogicalType();
+      DecimalConversion decimalConversion = new DecimalConversion();
+      if (fieldSchema.getType() == Schema.Type.FIXED) {
+        return decimalConversion.fromFixed((GenericFixed) fieldValue, 
fieldSchema,
+          LogicalTypes.decimal(dc.getPrecision(), dc.getScale())).longValue();
+      } else if (fieldSchema.getType() == Schema.Type.BYTES) {
+        ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
+        BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, 
fieldSchema,
+            LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
+        byteBuffer.rewind();
+        return convertedValue.longValue();
+      }
+    }
+    return Objects.isNull(fieldValue) ? defaultValue : 
Long.parseLong(fieldValue.toString());
+  }
+
   public static Schema getNullSchema() {
     return Schema.create(Schema.Type.NULL);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 75493e7b46..1b8450eecc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -173,7 +173,7 @@ public class TimelineUtils {
           HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, 
extraMetadataKey, instant)));
   }
 
-  private static Option<String> getMetadataValue(HoodieTableMetaClient 
metaClient, String extraMetadataKey, HoodieInstant instant) {
+  public static Option<String> getMetadataValue(HoodieTableMetaClient 
metaClient, String extraMetadataKey, HoodieInstant instant) {
     try {
       LOG.info("reading checkpoint info for:"  + instant + " key: " + 
extraMetadataKey);
       HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
index cf90eff8d6..1b1e845dfc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
@@ -22,7 +22,9 @@ package org.apache.hudi.common.util;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
 import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeParseException;
 import java.time.temporal.ChronoUnit;
@@ -39,6 +41,8 @@ public class DateTimeUtils {
   private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
       Collections.unmodifiableMap(initMap());
 
+  public static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+
   /**
    * Converts provided microseconds (from epoch) to {@link Instant}
    */
@@ -172,6 +176,10 @@ public class DateTimeUtils {
         .format(dtf);
   }
 
+  public static long millisFromTimestamp(LocalDateTime dateTime) {
+    return ChronoUnit.MILLIS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
+  }
+
   /**
    * Enum which defines time unit, mostly used to parse value from 
configuration file.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 31c8b554c0..aa1e3297bd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -255,6 +256,12 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(60)// default 1 minute
       .withDescription("Check interval for streaming read of SECOND, default 1 
minute");
 
+  public static final ConfigOption<String> EVENT_TIME_FIELD = ConfigOptions
+      .key(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
+      .stringType()
+      .noDefaultValue()
+      .withDescription("event time field name for flink");
+
   // this option is experimental
   public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = 
ConfigOptions
       .key("read.streaming.skip_compaction")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index a0f994f04a..bd1b1e68f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -446,6 +446,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
         .writeStatus(writeStatus)
         .lastBatch(false)
         .endInput(false)
+        .maxEventTime(this.currentTimeStamp)
         .build();
 
     this.eventGateway.sendEventToCoordinator(event);
@@ -482,14 +483,19 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, 
currentInstant);
       writeStatus = Collections.emptyList();
     }
+
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
         .taskID(taskID)
         .instantTime(currentInstant)
         .writeStatus(writeStatus)
         .lastBatch(true)
         .endInput(endInput)
+        .maxEventTime(this.currentTimeStamp)
         .build();
 
+    LOG.info("Write MetadataEvent in subtask [{}] for instant [{}] 
maxEventTime [{}]",
+        taskID, currentInstant, this.currentTimeStamp);
+
     this.eventGateway.sendEventToCoordinator(event);
     this.buckets.clear();
     this.tracer.reset();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 670748b90f..578bb10db5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -56,6 +56,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -147,6 +148,10 @@ public class StreamWriteOperatorCoordinator
    */
   private transient TableState tableState;
 
+  private transient Long minEventTime = Long.MAX_VALUE;
+
+  private final boolean commitEventTimeEnable;
+
   /**
    * The checkpoint metadata.
    */
@@ -165,6 +170,7 @@ public class StreamWriteOperatorCoordinator
     this.context = context;
     this.parallelism = context.currentParallelism();
     this.hiveConf = new 
SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));
+    this.commitEventTimeEnable = 
Objects.nonNull(conf.get(FlinkOptions.EVENT_TIME_FIELD));
   }
 
   @Override
@@ -503,10 +509,30 @@ public class StreamWriteOperatorCoordinator
       sendCommitAckEvents(checkpointId);
       return false;
     }
+    setMinEventTime();
     doCommit(instant, writeResults);
+    resetMinEventTime();
     return true;
   }
 
+  public void setMinEventTime() {
+    if (commitEventTimeEnable) {
+      LOG.info("[setMinEventTime] receive event time for current commit: {} ", 
Arrays.stream(eventBuffer).map(WriteMetadataEvent::getMaxEventTime).map(String::valueOf)
+          .collect(Collectors.joining(", ")));
+      this.minEventTime = Arrays.stream(eventBuffer)
+          .filter(Objects::nonNull)
+          .filter(maxEventTime -> maxEventTime.getMaxEventTime() > 0)
+          .map(WriteMetadataEvent::getMaxEventTime)
+          .min(Comparator.naturalOrder())
+          .map(aLong -> Math.min(aLong, 
this.minEventTime)).orElse(Long.MAX_VALUE);
+      LOG.info("[setMinEventTime] minEventTime: {} ", this.minEventTime);
+    }
+  }
+
+  public void resetMinEventTime() {
+    this.minEventTime = Long.MAX_VALUE;
+  }
+
   /**
    * Performs the actual commit action.
    */
@@ -519,6 +545,10 @@ public class StreamWriteOperatorCoordinator
 
     if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      if (commitEventTimeEnable) {
+        LOG.info("[doCommit] minEventTime: {} ", this.minEventTime);
+        checkpointCommitMetadata.put(FlinkOptions.EVENT_TIME_FIELD.key(), 
this.minEventTime.toString());
+      }
       if (hasErrors) {
         LOG.warn("Some records failed to merge but forcing commit since 
commitOnErrors set to true. Errors/Total="
             + totalErrorRecords + "/" + totalRecords);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index e1db125731..afecc6cc49 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -59,7 +59,6 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    * Table row type.
    */
   private final RowType rowType;
-
   /**
    * Constructs an AppendWriteFunction.
    *
@@ -133,6 +132,7 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
         .writeStatus(writeStatus)
         .lastBatch(true)
         .endInput(endInput)
+        .maxEventTime(this.currentTimeStamp)
         .build();
     this.eventGateway.sendEventToCoordinator(event);
     // nullify the write helper for next ckp
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index 06d9fcd851..63560fa123 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -63,12 +63,6 @@ public class BulkInsertWriteFunction<I>
    * Helper class for bulk insert mode.
    */
   private transient BulkInsertWriterHelper writerHelper;
-
-  /**
-   * Config options.
-   */
-  private final Configuration config;
-
   /**
    * Table row type.
    */
@@ -105,7 +99,7 @@ public class BulkInsertWriteFunction<I>
    * @param config The config options
    */
   public BulkInsertWriteFunction(Configuration config, RowType rowType) {
-    this.config = config;
+    super(config);
     this.rowType = rowType;
   }
 
@@ -144,6 +138,7 @@ public class BulkInsertWriteFunction<I>
         .writeStatus(writeStatus)
         .lastBatch(true)
         .endInput(true)
+        .maxEventTime(this.currentTimeStamp)
         .build();
     this.eventGateway.sendEventToCoordinator(event);
   }
@@ -180,6 +175,7 @@ public class BulkInsertWriteFunction<I>
         .bootstrap(true)
         .build();
     this.eventGateway.sendEventToCoordinator(event);
+    resetEventTime();
     LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", 
taskID);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 674cd3588a..b4569894a2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -58,12 +58,6 @@ public abstract class AbstractStreamWriteFunction<I>
     implements CheckpointedFunction {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamWriteFunction.class);
-
-  /**
-   * Config options.
-   */
-  protected final Configuration config;
-
   /**
    * Id of current subtask.
    */
@@ -132,7 +126,7 @@ public abstract class AbstractStreamWriteFunction<I>
    * @param config The config options
    */
   public AbstractStreamWriteFunction(Configuration config) {
-    this.config = config;
+    super(config);
   }
 
   @Override
@@ -166,6 +160,8 @@ public abstract class AbstractStreamWriteFunction<I>
     snapshotState();
     // Reload the snapshot state as the current state.
     reloadWriteMetaState();
+    //reset event time for current checkpoint interval
+    resetEventTime();
   }
 
   public abstract void snapshotState();
@@ -225,6 +221,7 @@ public abstract class AbstractStreamWriteFunction<I>
         .instantTime(currentInstant)
         .writeStatus(new ArrayList<>(writeStatuses))
         .bootstrap(true)
+        .maxEventTime(this.currentTimeStamp)
         .build();
     this.writeMetadataState.add(event);
     writeStatuses.clear();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
index 9e131ff91e..7756a1c2fa 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
@@ -18,10 +18,26 @@
 
 package org.apache.hudi.sink.common;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.StreamerUtil;
+
+import java.io.IOException;
+import java.util.Objects;
 
 /**
  * Base class for write function.
@@ -29,6 +45,39 @@ import 
org.apache.flink.streaming.api.operators.BoundedOneInput;
  * @param <I> the input type
  */
 public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, 
Object> implements BoundedOneInput {
+
+  /**
+   * Config options.
+   */
+  protected final Configuration config;
+  /**
+   * The current event time this write task seen for now.
+   */
+  protected Long currentTimeStamp = -1L;
+
+  protected String eventTimeField;
+
+  protected int eventTimeFieldIndex;
+
+  protected LogicalType eventTimeDataType;
+
+  protected RowData.FieldGetter eventTimeFieldGetter;
+
+  protected Schema writeSchema;
+
+  public AbstractWriteFunction(Configuration config) {
+    this.config = config;
+    if (config.containsKey(FlinkOptions.EVENT_TIME_FIELD.key())) {
+      this.writeSchema = StreamerUtil.getSourceSchema(config);
+      this.eventTimeField = config.getString(FlinkOptions.EVENT_TIME_FIELD);
+      this.eventTimeFieldIndex = 
this.writeSchema.getField(this.eventTimeField).pos();
+      RowType rowType =
+          (RowType) 
AvroSchemaConverter.convertToDataType(this.writeSchema).getLogicalType();
+      this.eventTimeDataType = rowType.getTypeAt(eventTimeFieldIndex);
+      this.eventTimeFieldGetter = RowData.createFieldGetter(eventTimeDataType, 
eventTimeFieldIndex);
+    }
+  }
+
   /**
    * Sets up the event gateway.
    */
@@ -45,4 +94,58 @@ public abstract class AbstractWriteFunction<I> extends 
ProcessFunction<I, Object
    * @param event The event
    */
   public abstract void handleOperatorEvent(OperatorEvent event);
+
+  /**
+   * Extract TimeStamp from input value with specified event time field.
+   *
+   * @param value The input value
+   * @return the new timestamp for current.
+   */
+  public long extractTimestamp(I value) {
+    if (value instanceof HoodieAvroRecord) {
+      return extractTimestamp((HoodieAvroRecord) value);
+    }
+    return extractTimestamp((RowData) value);
+  }
+
+  /**
+   * whether enable extract event time stamp from record.
+   *
+   * @return flag to enable or disable the event time extract.
+   */
+  public boolean extractTimeStampEnable() {
+    return Objects.nonNull(this.eventTimeField);
+  }
+
+  public long extractTimestamp(HoodieAvroRecord value) {
+    try {
+      GenericRecord record = (GenericRecord) value.getData()
+          .getInsertValue(this.writeSchema).get();
+      Long eventTime = HoodieAvroUtils.getNestedFieldValAsLong(
+          record, eventTimeField,
+          true, -1L);
+      this.currentTimeStamp = Math.max(eventTime, this.currentTimeStamp);
+      return eventTime;
+    } catch (IOException e) {
+      throw new HoodieException("extract event time failed. " + e);
+    }
+  }
+
+  public long extractTimestamp(RowData value) {
+    Long eventTime = -1L;
+    try {
+      Object eventTimeObject =
+          value.isNullAt(eventTimeFieldIndex) ? -1L : 
this.eventTimeFieldGetter.getFieldOrNull(value);
+      eventTime = DataTypeUtils.getAsLong(eventTimeObject, 
this.eventTimeDataType);
+      this.currentTimeStamp = Math.max(eventTime, this.currentTimeStamp);
+    } catch (Throwable e) {
+      throw new HoodieException(String.format("eventTimeFieldIndex=%s, 
eventTimeDataType=%s, eventTime=%s. ",
+          this.eventTimeFieldIndex, this.eventTimeDataType, eventTime) + e);
+    }
+    return eventTime;
+  }
+
+  public void resetEventTime() {
+    this.currentTimeStamp = -1L;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
index e339ccb0b7..bd41898487 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * Base class for write operator.
@@ -39,6 +40,14 @@ public abstract class AbstractWriteOperator<I>
     this.function = function;
   }
 
+  @Override
+  public void processElement(StreamRecord<I> element) throws Exception {
+    if (this.function.extractTimeStampEnable()) {
+      this.function.extractTimestamp(element.getValue());
+    }
+    super.processElement(element);
+  }
+
   public void setOperatorEventGateway(OperatorEventGateway 
operatorEventGateway) {
     this.function.setOperatorEventGateway(operatorEventGateway);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
index 0eb06bdd82..f68052e895 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java
@@ -54,6 +54,7 @@ public class WriteMetadataEvent implements OperatorEvent {
    */
   private boolean bootstrap;
 
+  private Long maxEventTime = Long.MIN_VALUE;
   /**
    * Creates an event.
    *
@@ -81,6 +82,18 @@ public class WriteMetadataEvent implements OperatorEvent {
     this.bootstrap = bootstrap;
   }
 
+  private WriteMetadataEvent(
+      int taskID,
+      String instantTime,
+      List<WriteStatus> writeStatuses,
+      boolean lastBatch,
+      boolean endInput,
+      boolean bootstrap,
+      Long maxEventTIme) {
+    this(taskID, instantTime, writeStatuses, lastBatch, endInput, bootstrap);
+    this.maxEventTime = maxEventTIme;
+  }
+
   // default constructor for efficient serialization
   public WriteMetadataEvent() {
   }
@@ -140,6 +153,14 @@ public class WriteMetadataEvent implements OperatorEvent {
     this.lastBatch = lastBatch;
   }
 
+  public Long getMaxEventTime() {
+    return maxEventTime;
+  }
+
+  public void setMaxEventTime(Long maxEventTime) {
+    this.maxEventTime = maxEventTime;
+  }
+
   /**
    * Merges this event with given {@link WriteMetadataEvent} {@code other}.
    *
@@ -172,6 +193,7 @@ public class WriteMetadataEvent implements OperatorEvent {
         + ", lastBatch=" + lastBatch
         + ", endInput=" + endInput
         + ", bootstrap=" + bootstrap
+        + ", maxEventTime=" + maxEventTime
         + '}';
   }
 
@@ -209,11 +231,13 @@ public class WriteMetadataEvent implements OperatorEvent {
     private boolean endInput = false;
     private boolean bootstrap = false;
 
+    private Long maxEventTime = Long.MAX_VALUE;
+
     public WriteMetadataEvent build() {
       Objects.requireNonNull(taskID);
       Objects.requireNonNull(instantTime);
       Objects.requireNonNull(writeStatus);
-      return new WriteMetadataEvent(taskID, instantTime, writeStatus, 
lastBatch, endInput, bootstrap);
+      return new WriteMetadataEvent(taskID, instantTime, writeStatus, 
lastBatch, endInput, bootstrap, maxEventTime);
     }
 
     public Builder taskID(int taskID) {
@@ -226,6 +250,11 @@ public class WriteMetadataEvent implements OperatorEvent {
       return this;
     }
 
+    public Builder maxEventTime(Long maxEventTime) {
+      this.maxEventTime = maxEventTime;
+      return this;
+    }
+
     public Builder writeStatus(List<WriteStatus> writeStatus) {
       this.writeStatus = writeStatus;
       return this;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index e91432b5e3..6c4bb9925c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.util;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -26,15 +28,41 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import javax.annotation.Nullable;
 import java.math.BigDecimal;
+import java.time.DateTimeException;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+
+import static java.time.temporal.ChronoField.DAY_OF_MONTH;
+import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+import static java.time.temporal.ChronoField.NANO_OF_SECOND;
+import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
+import static java.time.temporal.ChronoField.YEAR;
 
 /**
  * Utilities for {@link org.apache.flink.table.types.DataType}.
  */
 public class DataTypeUtils {
+
+  private static final DateTimeFormatter DEFAULT_TIMESTAMP_FORMATTER =
+      new DateTimeFormatterBuilder()
+          .appendPattern("yyyy-[MM][M]-[dd][d]")
+          .optionalStart()
+          .appendPattern(" [HH][H]:[mm][m]:[ss][s]")
+          .appendFraction(NANO_OF_SECOND, 0, 9, true)
+          .optionalEnd()
+          .toFormatter();
+
   /**
    * Returns whether the given type is TIMESTAMP type.
    */
@@ -123,4 +151,117 @@ public class DataTypeUtils {
                 "Can not convert %s to type %s for partition value", 
partition, type));
     }
   }
+
+  /**
+   * Ensures the give columns of the row data type are not nullable(for 
example, the primary keys).
+   *
+   * @param dataType  The row data type, datatype logicaltype must be rowtype
+   * @param pkColumns The primary keys
+   * @return a new row data type if any column nullability is tweaked or the 
original data type
+   */
+  public static DataType ensureColumnsAsNonNullable(DataType dataType, 
@Nullable List<String> pkColumns) {
+    if (pkColumns == null || pkColumns.isEmpty()) {
+      return dataType;
+    }
+    LogicalType dataTypeLogicalType = dataType.getLogicalType();
+    if (!(dataTypeLogicalType instanceof RowType)) {
+      throw new RuntimeException("The datatype to be converted must be row 
type, but this type is :" + dataTypeLogicalType.getClass());
+    }
+    RowType rowType = (RowType) dataTypeLogicalType;
+    List<DataType> originalFieldTypes = dataType.getChildren();
+    List<String> fieldNames = rowType.getFieldNames();
+    List<DataType> fieldTypes = new ArrayList<>();
+    boolean tweaked = false;
+    for (int i = 0; i < fieldNames.size(); i++) {
+      if (pkColumns.contains(fieldNames.get(i)) && 
rowType.getTypeAt(i).isNullable()) {
+        fieldTypes.add(originalFieldTypes.get(i).notNull());
+        tweaked = true;
+      } else {
+        fieldTypes.add(originalFieldTypes.get(i));
+      }
+    }
+    if (!tweaked) {
+      return dataType;
+    }
+    List<DataTypes.Field> fields = new ArrayList<>();
+    for (int i = 0; i < fieldNames.size(); i++) {
+      fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i)));
+    }
+    return 
DataTypes.ROW(fields.stream().toArray(DataTypes.Field[]::new)).notNull();
+  }
+
+  public static Long getAsLong(Object value, LogicalType logicalType) {
+    if (logicalType.getTypeRoot() == 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
+        || logicalType.getTypeRoot() == LogicalTypeRoot.DATE
+        || logicalType.getTypeRoot() == 
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE) {
+      return toMills(toLocalDateTime(value.toString()));
+    }
+    return Long.parseLong(value.toString());
+  }
+
+  public static LocalDateTime toLocalDateTime(String timestampString) {
+    try {
+      return parseTimestampData(timestampString);
+    } catch (DateTimeParseException e) {
+      return LocalDateTime.parse(timestampString);
+    }
+  }
+
+  public static LocalDateTime parseTimestampData(String dateStr) throws 
DateTimeException {
+    // Precision is hardcoded to match signature of TO_TIMESTAMP
+    //  https://issues.apache.org/jira/browse/FLINK-14925
+    return parseTimestampData(dateStr, 3);
+  }
+
+  public static LocalDateTime parseTimestampData(String dateStr, int precision)
+      throws DateTimeException {
+    return fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), 
precision);
+  }
+
+  public static long toMills(LocalDateTime dateTime) {
+    return TimestampData.fromLocalDateTime(dateTime).getMillisecond();
+  }
+
+  private static LocalDateTime fromTemporalAccessor(TemporalAccessor accessor, 
int precision) {
+    // complement year with 1970
+    int year = accessor.isSupported(YEAR) ? accessor.get(YEAR) : 1970;
+    // complement month with 1
+    int month = accessor.isSupported(MONTH_OF_YEAR) ? 
accessor.get(MONTH_OF_YEAR) : 1;
+    // complement day with 1
+    int day = accessor.isSupported(DAY_OF_MONTH) ? accessor.get(DAY_OF_MONTH) 
: 1;
+    // complement hour with 0
+    int hour = accessor.isSupported(HOUR_OF_DAY) ? accessor.get(HOUR_OF_DAY) : 
0;
+    // complement minute with 0
+    int minute = accessor.isSupported(MINUTE_OF_HOUR) ? 
accessor.get(MINUTE_OF_HOUR) : 0;
+    // complement second with 0
+    int second = accessor.isSupported(SECOND_OF_MINUTE) ? 
accessor.get(SECOND_OF_MINUTE) : 0;
+    // complement nano_of_second with 0
+    int nanoOfSecond = accessor.isSupported(NANO_OF_SECOND) ? 
accessor.get(NANO_OF_SECOND) : 0;
+
+    if (precision == 0) {
+      nanoOfSecond = 0;
+    } else if (precision != 9) {
+      nanoOfSecond = (int) floor(nanoOfSecond, powerX(10, 9 - precision));
+    }
+
+    return LocalDateTime.of(year, month, day, hour, minute, second, 
nanoOfSecond);
+  }
+
+  private static long floor(long a, long b) {
+    long r = a % b;
+    if (r < 0) {
+      return a - r - b;
+    } else {
+      return a - r;
+    }
+  }
+
+  private static long powerX(long a, long b) {
+    long x = 1;
+    while (b > 0) {
+      x *= a;
+      --b;
+    }
+    return x;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 193c0abcd8..9556a94083 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -18,14 +18,21 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsInference;
 import org.apache.hudi.sink.transform.ChainedTransformer;
 import org.apache.hudi.sink.transform.Transformer;
 import org.apache.hudi.sink.utils.Pipelines;
+import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.HoodiePipeline;
 import org.apache.hudi.util.StreamerUtil;
@@ -67,6 +74,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Integration test for Flink Hoodie stream sink.
@@ -109,6 +119,31 @@ public class ITTestDataStreamWrite extends TestLogger {
     testWriteToHoodie(conf, "cow_write", 2, EXPECTED);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"BUCKET", "FLINK_STATE"})
+  public void testWriteCopyOnWriteWithEventTimeExtract(String indexType) 
throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    conf.setString(FlinkOptions.INDEX_TYPE, indexType);
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
+    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+    conf.setString(FlinkOptions.EVENT_TIME_FIELD, "ts");
+
+    testWriteToHoodie(conf, "cow_write", 1, EXPECTED);
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(conf.getString(FlinkOptions.PATH));
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(conf.getString(FlinkOptions.PATH)).build();
+    HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, 
HoodieFlinkEngineContext.DEFAULT, metaClient);
+    List<HoodieInstant> hoodieInstants = 
table.getFileSystemView().getTimeline().getInstants().collect(Collectors.toList());
+    assertEquals(1, hoodieInstants.size());
+    byte[] data = 
table.getFileSystemView().getTimeline().getInstantDetails(hoodieInstants.get(0)).get();
+    Map<String, String> extraMetadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class).getExtraMetadata();
+    if (indexType.equals("BUCKET")) {
+      assertEquals("2000", 
extraMetadata.get(FlinkOptions.EVENT_TIME_FIELD.key()));
+    } else {
+      assertEquals("4000", 
extraMetadata.get(FlinkOptions.EVENT_TIME_FIELD.key()));
+    }
+  }
+
   @Test
   public void testWriteCopyOnWriteWithTransformer() throws Exception {
     Transformer transformer = (ds) -> ds.map((rowdata) -> {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteFunctionEventTimeExtract.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteFunctionEventTimeExtract.java
new file mode 100644
index 0000000000..52ba4e07b9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteFunctionEventTimeExtract.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.DateTimeUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.HoodiePipeline;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.source.ContinuousFileSource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class TestWriteFunctionEventTimeExtract extends TestLogger {
+
+  @TempDir
+  File tempFile;
+
+  public static final DataType ROW_DATA_TYPE = DataTypes.ROW(
+          DataTypes.FIELD("id", DataTypes.VARCHAR(20)),// record key
+          DataTypes.FIELD("data", DataTypes.VARCHAR(10)),
+          DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+          DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+      .notNull();
+
+  private static Stream<Arguments> parameters() {
+    return Stream.of(
+        Arguments.of("COPY_ON_WRITE", "FLINK_STATE", 1, false),
+        Arguments.of("COPY_ON_WRITE", "FLINK_STATE", 2, true),
+        Arguments.of("MERGE_ON_READ", "FLINK_STATE", 1, false),
+        Arguments.of("MERGE_ON_READ", "FLINK_STATE", 2, true),
+        Arguments.of("COPY_ON_WRITE", "BUCKET", 1, false),
+        Arguments.of("COPY_ON_WRITE", "BUCKET", 2, true),
+        Arguments.of("MERGE_ON_READ", "BUCKET", 1, false),
+        Arguments.of("MERGE_ON_READ", "BUCKET", 2, true),
+        Arguments.of("MERGE_ON_READ", "NON_INDEX", 1, false),
+        Arguments.of("MERGE_ON_READ", "NON_INDEX", 2, true));
+  }
+
+  @ParameterizedTest
+  @MethodSource("parameters")
+  void testWriteWithEventTime(String tableType, String indexType, int 
parallelism, boolean partitioned) throws Exception {
+    Configuration conf = getConf(tableType, indexType, parallelism);
+
+    List<Row> rows =
+        Lists.newArrayList(
+            Row.of("1", "hello", LocalDateTime.parse("2012-12-12T12:12:12")),
+            Row.of("2", "world", LocalDateTime.parse("2012-12-12T12:12:01")),
+            Row.of("3", "world", LocalDateTime.parse("2012-12-12T12:12:02")),
+            Row.of("4", "foo", LocalDateTime.parse("2012-12-12T12:12:10")));
+
+    // write rowData with eventTime
+    testWriteToHoodie(conf, parallelism, partitioned, ((RowType) 
ROW_DATA_TYPE.getLogicalType()), rows);
+
+    // check eventTime
+    checkWriteEventTime(indexType, parallelism, conf);
+  }
+
+  public Configuration getConf(String tableType, String indexType, int 
parallelism) {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), ROW_DATA_TYPE);
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType);
+    conf.setString(FlinkOptions.INDEX_TYPE, indexType);
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, "id");
+    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
+    conf.setString(FlinkOptions.EVENT_TIME_FIELD, "ts");
+    conf.setInteger(FlinkOptions.WRITE_TASKS, parallelism);
+    conf.setString("hoodie.metrics.on", "false");
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, parallelism);
+    return conf;
+  }
+
+  private void checkWriteEventTime(String indexType, int parallelism, 
Configuration conf) throws IOException {
+    if (parallelism <= 1) { // single mode
+      
Assertions.assertEquals(DateTimeUtils.millisFromTimestamp(LocalDateTime.parse("2012-12-12T12:12:12")),
+          getLastEventTime(conf));
+    } else if (indexType.equals("BUCKET") || indexType.equals("NON_INDEX")) { 
// hash mode
+      
Assertions.assertEquals(DateTimeUtils.millisFromTimestamp(LocalDateTime.parse("2012-12-12T12:12:10")),
+          getLastEventTime(conf));
+    } else { // partition mode
+      
Assertions.assertEquals(DateTimeUtils.millisFromTimestamp(LocalDateTime.parse("2012-12-12T12:12:02")),
+          getLastEventTime(conf));
+    }
+  }
+
+  public long getLastEventTime(Configuration conf) throws IOException {
+    String path = conf.getString(FlinkOptions.PATH);
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(path);
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    HoodieInstant hoodieInstant = timeline.getReverseOrderedInstants()
+        .findFirst().orElse(null);
+    if (hoodieInstant == null) {
+      return -1L;
+    } else {
+      Option<String> eventtime = TimelineUtils.getMetadataValue(metaClient, 
FlinkOptions.EVENT_TIME_FIELD.key(),
+          hoodieInstant);
+      return Long.parseLong(eventtime.orElseGet(() -> "-1"));
+    }
+  }
+
+  public List<String> getRowDataString(List<Row> rows, boolean partitioned) {
+    List<String> dataBuffer = new ArrayList<>();
+    for (Row row : rows) {
+      String id = (String) row.getField(0);
+      String data = (String) row.getField(1);
+      LocalDateTime ts = (LocalDateTime) row.getField(2);
+      String rowData = String.format("{\"id\": \"%s\", \"data\": \"%s\", 
\"ts\": \"%s\", \"partition\": \"%s\"}",
+          id, data, ts.toString(), partitioned ? data : "par");
+      dataBuffer.add(rowData);
+    }
+    return dataBuffer;
+  }
+
+  private void testWriteToHoodie(
+      Configuration conf,
+      int parallelism,
+      boolean partitioned,
+      RowType rowType,
+      List<Row> rows) throws Exception {
+
+    StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(parallelism);
+    execEnv.setMaxParallelism(parallelism);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+        rowType,
+        InternalTypeInfo.of(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+
+    boolean isMor = 
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
+
+    List<String> dataBuffer = getRowDataString(rows, partitioned);
+
+    DataStream<RowData> dataStream;
+    dataStream = execEnv
+        // use continuous file source to trigger checkpoint
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(null, 
dataBuffer, 1))
+        .name("continuous_file_source")
+        .setParallelism(1)
+        .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(1);
+
+
+    //sink to hoodie table use low-level sink api.
+    HoodiePipeline.Builder builder = HoodiePipeline.builder("t_event_sink")
+        .column("id string not null")
+        .column("data string")
+        .column("`ts` timestamp(3)")
+        .column("`partition` string")
+        .pk("id")
+        .partition("partition")
+        .options(conf.toMap());
+
+    builder.sink(dataStream, false);
+    execute(execEnv, isMor, "EventTime_Sink_Test");
+  }
+
+  public void execute(StreamExecutionEnvironment execEnv, boolean isMor, 
String jobName) throws Exception {
+    if (isMor) {
+      JobClient client = execEnv.executeAsync(jobName);
+      if (client.getJobStatus().get() != JobStatus.FAILED) {
+        try {
+          TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction 
to finish
+          client.cancel();
+        } catch (Throwable var1) {
+          // ignored
+        }
+      }
+    } else {
+      // wait for the streaming job to finish
+      execEnv.execute(jobName);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index b83f3cc478..d31a884827 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -171,8 +171,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     } else {
       bucketAssignerFunction.processElement(hoodieRecord, null, collector);
       bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
-      writeFunction.processElement(collector.getVal(), null, null);
+      if (writeFunction.extractTimeStampEnable()) {
+        writeFunction.extractTimestamp(collector.getVal());
+      }
     }
+    writeFunction.processElement(collector.getVal(), null, null);
   }
 
   public WriteMetadataEvent[] getEventBuffer() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestDataTypeUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestDataTypeUtils.java
new file mode 100644
index 0000000000..cda252ca5d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestDataTypeUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.hudi.util.DataTypeUtils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestDataTypeUtils {
+  @Test
+  public void testGetAsLong() {
+    long t1 = DataTypeUtils.getAsLong("2012-12-12T12:12:12", 
DataTypes.TIMESTAMP(3).getLogicalType());
+    assertEquals(1355314332000L, t1);
+
+    long t2 = DataTypeUtils.getAsLong("2012-12-12 12:12:12", 
DataTypes.TIME().getLogicalType());
+    assertEquals(1355314332000L, t2);
+
+    long t3 = DataTypeUtils.getAsLong("2012-12-12", 
DataTypes.DATE().getLogicalType());
+    assertEquals(1355270400000L, t3);
+
+    long t4 = DataTypeUtils.getAsLong(100L, 
DataTypes.BIGINT().getLogicalType());
+    assertEquals(100L, t4);
+
+    long t5 = DataTypeUtils.getAsLong(100, DataTypes.INT().getLogicalType());
+    assertEquals(100, t5);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
index 2830eefef0..1715593077 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
@@ -133,6 +133,11 @@ public class ContinuousFileSource implements 
ScanTableSource {
       this.checkpoints = checkpoints;
     }
 
+    public BoundedSourceFunction(Path path, List<String> dataBuffer, int 
checkpoints) {
+      this(path, checkpoints);
+      this.dataBuffer = dataBuffer;
+    }
+
     @Override
     public void run(SourceContext<String> context) throws Exception {
       if (this.dataBuffer == null) {

Reply via email to