This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9fbf3b920d24e3e9f517beca10a2f4d892ae2d06 Author: jerryyue <jerry...@didiglobal.com> AuthorDate: Fri Oct 28 19:16:46 2022 +0800 [HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data --- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 28 +++ .../hudi/common/table/timeline/TimelineUtils.java | 2 +- .../org/apache/hudi/common/util/DateTimeUtils.java | 8 + .../apache/hudi/configuration/FlinkOptions.java | 7 + .../org/apache/hudi/sink/StreamWriteFunction.java | 6 + .../hudi/sink/StreamWriteOperatorCoordinator.java | 30 +++ .../hudi/sink/append/AppendWriteFunction.java | 2 +- .../hudi/sink/bulk/BulkInsertWriteFunction.java | 10 +- .../sink/common/AbstractStreamWriteFunction.java | 11 +- .../hudi/sink/common/AbstractWriteFunction.java | 103 +++++++++ .../hudi/sink/common/AbstractWriteOperator.java | 9 + .../apache/hudi/sink/event/WriteMetadataEvent.java | 31 ++- .../java/org/apache/hudi/util/DataTypeUtils.java | 141 +++++++++++++ .../apache/hudi/sink/ITTestDataStreamWrite.java | 35 ++++ .../sink/TestWriteFunctionEventTimeExtract.java | 232 +++++++++++++++++++++ .../sink/utils/StreamWriteFunctionWrapper.java | 5 +- .../apache/hudi/sink/utils/TestDataTypeUtils.java | 45 ++++ .../hudi/utils/source/ContinuousFileSource.java | 5 + 18 files changed, 692 insertions(+), 18 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index a352e86b96..735940eea4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -75,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Deque; import java.util.LinkedList; +import java.util.Objects; import java.util.Set; import java.util.TimeZone; import java.util.stream.Collectors; @@ -657,6 +658,33 @@ public class HoodieAvroUtils { return fieldValue; } + public static Long getNestedFieldValAsLong(GenericRecord record, String fieldName,boolean consistentLogicalTimestampEnabled, Long defaultValue) { + GenericRecord valueNode = record; + Object fieldValue = valueNode.get(fieldName); + Schema fieldSchema = valueNode.getSchema().getField(fieldName).schema(); + if (fieldSchema.getLogicalType() == LogicalTypes.date()) { + return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())).toEpochDay(); + } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis() && consistentLogicalTimestampEnabled) { + return new Timestamp(Long.parseLong(fieldValue.toString())).getTime(); + } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros() && consistentLogicalTimestampEnabled) { + return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000).getTime(); + } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { + Decimal dc = (Decimal) fieldSchema.getLogicalType(); + DecimalConversion decimalConversion = new DecimalConversion(); + if (fieldSchema.getType() == Schema.Type.FIXED) { + return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema, + LogicalTypes.decimal(dc.getPrecision(), dc.getScale())).longValue(); + } else if (fieldSchema.getType() == Schema.Type.BYTES) { + ByteBuffer byteBuffer = (ByteBuffer) fieldValue; + BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, fieldSchema, + LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); + byteBuffer.rewind(); + return convertedValue.longValue(); + } + } + return Objects.isNull(fieldValue) ? defaultValue : Long.parseLong(fieldValue.toString()); + } + public static Schema getNullSchema() { return Schema.create(Schema.Type.NULL); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 75493e7b46..1b8450eecc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -173,7 +173,7 @@ public class TimelineUtils { HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant))); } - private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) { + public static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) { try { LOG.info("reading checkpoint info for:" + instant + " key: " + extraMetadataKey); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java index cf90eff8d6..1b1e845dfc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java @@ -22,7 +22,9 @@ package org.apache.hudi.common.util; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; @@ -39,6 +41,8 @@ public class DateTimeUtils { private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP = Collections.unmodifiableMap(initMap()); + public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + /** * Converts provided microseconds (from epoch) to {@link Instant} */ @@ -172,6 +176,10 @@ public class DateTimeUtils { .format(dtf); } + public static long millisFromTimestamp(LocalDateTime dateTime) { + return ChronoUnit.MILLIS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); + } + /** * Enum which defines time unit, mostly used to parse value from configuration file. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 31c8b554c0..aa1e3297bd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieIndexConfig; @@ -255,6 +256,12 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(60)// default 1 minute .withDescription("Check interval for streaming read of SECOND, default 1 minute"); + public static final ConfigOption<String> EVENT_TIME_FIELD = ConfigOptions + .key(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY) + .stringType() + .noDefaultValue() + .withDescription("event time field name for flink"); + // this option is experimental public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = ConfigOptions .key("read.streaming.skip_compaction") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index a0f994f04a..bd1b1e68f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -446,6 +446,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { .writeStatus(writeStatus) .lastBatch(false) .endInput(false) + .maxEventTime(this.currentTimeStamp) .build(); this.eventGateway.sendEventToCoordinator(event); @@ -482,14 +483,19 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); } + final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) .instantTime(currentInstant) .writeStatus(writeStatus) .lastBatch(true) .endInput(endInput) + .maxEventTime(this.currentTimeStamp) .build(); + LOG.info("Write MetadataEvent in subtask [{}] for instant [{}] maxEventTime [{}]", + taskID, currentInstant, this.currentTimeStamp); + this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 670748b90f..578bb10db5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -56,6 +56,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -147,6 +148,10 @@ public class StreamWriteOperatorCoordinator */ private transient TableState tableState; + private transient Long minEventTime = Long.MAX_VALUE; + + private final boolean commitEventTimeEnable; + /** * The checkpoint metadata. */ @@ -165,6 +170,7 @@ public class StreamWriteOperatorCoordinator this.context = context; this.parallelism = context.currentParallelism(); this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(conf)); + this.commitEventTimeEnable = Objects.nonNull(conf.get(FlinkOptions.EVENT_TIME_FIELD)); } @Override @@ -503,10 +509,30 @@ public class StreamWriteOperatorCoordinator sendCommitAckEvents(checkpointId); return false; } + setMinEventTime(); doCommit(instant, writeResults); + resetMinEventTime(); return true; } + public void setMinEventTime() { + if (commitEventTimeEnable) { + LOG.info("[setMinEventTime] receive event time for current commit: {} ", Arrays.stream(eventBuffer).map(WriteMetadataEvent::getMaxEventTime).map(String::valueOf) + .collect(Collectors.joining(", "))); + this.minEventTime = Arrays.stream(eventBuffer) + .filter(Objects::nonNull) + .filter(maxEventTime -> maxEventTime.getMaxEventTime() > 0) + .map(WriteMetadataEvent::getMaxEventTime) + .min(Comparator.naturalOrder()) + .map(aLong -> Math.min(aLong, this.minEventTime)).orElse(Long.MAX_VALUE); + LOG.info("[setMinEventTime] minEventTime: {} ", this.minEventTime); + } + } + + public void resetMinEventTime() { + this.minEventTime = Long.MAX_VALUE; + } + /** * Performs the actual commit action. */ @@ -519,6 +545,10 @@ public class StreamWriteOperatorCoordinator if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) { HashMap<String, String> checkpointCommitMetadata = new HashMap<>(); + if (commitEventTimeEnable) { + LOG.info("[doCommit] minEventTime: {} ", this.minEventTime); + checkpointCommitMetadata.put(FlinkOptions.EVENT_TIME_FIELD.key(), this.minEventTime.toString()); + } if (hasErrors) { LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total=" + totalErrorRecords + "/" + totalRecords); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java index e1db125731..afecc6cc49 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -59,7 +59,6 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> { * Table row type. */ private final RowType rowType; - /** * Constructs an AppendWriteFunction. * @@ -133,6 +132,7 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> { .writeStatus(writeStatus) .lastBatch(true) .endInput(endInput) + .maxEventTime(this.currentTimeStamp) .build(); this.eventGateway.sendEventToCoordinator(event); // nullify the write helper for next ckp diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 06d9fcd851..63560fa123 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -63,12 +63,6 @@ public class BulkInsertWriteFunction<I> * Helper class for bulk insert mode. */ private transient BulkInsertWriterHelper writerHelper; - - /** - * Config options. - */ - private final Configuration config; - /** * Table row type. */ @@ -105,7 +99,7 @@ public class BulkInsertWriteFunction<I> * @param config The config options */ public BulkInsertWriteFunction(Configuration config, RowType rowType) { - this.config = config; + super(config); this.rowType = rowType; } @@ -144,6 +138,7 @@ public class BulkInsertWriteFunction<I> .writeStatus(writeStatus) .lastBatch(true) .endInput(true) + .maxEventTime(this.currentTimeStamp) .build(); this.eventGateway.sendEventToCoordinator(event); } @@ -180,6 +175,7 @@ public class BulkInsertWriteFunction<I> .bootstrap(true) .build(); this.eventGateway.sendEventToCoordinator(event); + resetEventTime(); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 674cd3588a..b4569894a2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -58,12 +58,6 @@ public abstract class AbstractStreamWriteFunction<I> implements CheckpointedFunction { private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class); - - /** - * Config options. - */ - protected final Configuration config; - /** * Id of current subtask. */ @@ -132,7 +126,7 @@ public abstract class AbstractStreamWriteFunction<I> * @param config The config options */ public AbstractStreamWriteFunction(Configuration config) { - this.config = config; + super(config); } @Override @@ -166,6 +160,8 @@ public abstract class AbstractStreamWriteFunction<I> snapshotState(); // Reload the snapshot state as the current state. reloadWriteMetaState(); + //reset event time for current checkpoint interval + resetEventTime(); } public abstract void snapshotState(); @@ -225,6 +221,7 @@ public abstract class AbstractStreamWriteFunction<I> .instantTime(currentInstant) .writeStatus(new ArrayList<>(writeStatuses)) .bootstrap(true) + .maxEventTime(this.currentTimeStamp) .build(); this.writeMetadataState.add(event); writeStatuses.clear(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java index 9e131ff91e..7756a1c2fa 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java @@ -18,10 +18,26 @@ package org.apache.hudi.sink.common; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.StreamerUtil; + +import java.io.IOException; +import java.util.Objects; /** * Base class for write function. @@ -29,6 +45,39 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; * @param <I> the input type */ public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object> implements BoundedOneInput { + + /** + * Config options. + */ + protected final Configuration config; + /** + * The current event time this write task seen for now. + */ + protected Long currentTimeStamp = -1L; + + protected String eventTimeField; + + protected int eventTimeFieldIndex; + + protected LogicalType eventTimeDataType; + + protected RowData.FieldGetter eventTimeFieldGetter; + + protected Schema writeSchema; + + public AbstractWriteFunction(Configuration config) { + this.config = config; + if (config.containsKey(FlinkOptions.EVENT_TIME_FIELD.key())) { + this.writeSchema = StreamerUtil.getSourceSchema(config); + this.eventTimeField = config.getString(FlinkOptions.EVENT_TIME_FIELD); + this.eventTimeFieldIndex = this.writeSchema.getField(this.eventTimeField).pos(); + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(this.writeSchema).getLogicalType(); + this.eventTimeDataType = rowType.getTypeAt(eventTimeFieldIndex); + this.eventTimeFieldGetter = RowData.createFieldGetter(eventTimeDataType, eventTimeFieldIndex); + } + } + /** * Sets up the event gateway. */ @@ -45,4 +94,58 @@ public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object * @param event The event */ public abstract void handleOperatorEvent(OperatorEvent event); + + /** + * Extract TimeStamp from input value with specified event time field. + * + * @param value The input value + * @return the new timestamp for current. + */ + public long extractTimestamp(I value) { + if (value instanceof HoodieAvroRecord) { + return extractTimestamp((HoodieAvroRecord) value); + } + return extractTimestamp((RowData) value); + } + + /** + * whether enable extract event time stamp from record. + * + * @return flag to enable or disable the event time extract. + */ + public boolean extractTimeStampEnable() { + return Objects.nonNull(this.eventTimeField); + } + + public long extractTimestamp(HoodieAvroRecord value) { + try { + GenericRecord record = (GenericRecord) value.getData() + .getInsertValue(this.writeSchema).get(); + Long eventTime = HoodieAvroUtils.getNestedFieldValAsLong( + record, eventTimeField, + true, -1L); + this.currentTimeStamp = Math.max(eventTime, this.currentTimeStamp); + return eventTime; + } catch (IOException e) { + throw new HoodieException("extract event time failed. " + e); + } + } + + public long extractTimestamp(RowData value) { + Long eventTime = -1L; + try { + Object eventTimeObject = + value.isNullAt(eventTimeFieldIndex) ? -1L : this.eventTimeFieldGetter.getFieldOrNull(value); + eventTime = DataTypeUtils.getAsLong(eventTimeObject, this.eventTimeDataType); + this.currentTimeStamp = Math.max(eventTime, this.currentTimeStamp); + } catch (Throwable e) { + throw new HoodieException(String.format("eventTimeFieldIndex=%s, eventTimeDataType=%s, eventTime=%s. ", + this.eventTimeFieldIndex, this.eventTimeDataType, eventTime) + e); + } + return eventTime; + } + + public void resetEventTime() { + this.currentTimeStamp = -1L; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java index e339ccb0b7..bd41898487 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** * Base class for write operator. @@ -39,6 +40,14 @@ public abstract class AbstractWriteOperator<I> this.function = function; } + @Override + public void processElement(StreamRecord<I> element) throws Exception { + if (this.function.extractTimeStampEnable()) { + this.function.extractTimestamp(element.getValue()); + } + super.processElement(element); + } + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { this.function.setOperatorEventGateway(operatorEventGateway); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java index 0eb06bdd82..f68052e895 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java @@ -54,6 +54,7 @@ public class WriteMetadataEvent implements OperatorEvent { */ private boolean bootstrap; + private Long maxEventTime = Long.MIN_VALUE; /** * Creates an event. * @@ -81,6 +82,18 @@ public class WriteMetadataEvent implements OperatorEvent { this.bootstrap = bootstrap; } + private WriteMetadataEvent( + int taskID, + String instantTime, + List<WriteStatus> writeStatuses, + boolean lastBatch, + boolean endInput, + boolean bootstrap, + Long maxEventTIme) { + this(taskID, instantTime, writeStatuses, lastBatch, endInput, bootstrap); + this.maxEventTime = maxEventTIme; + } + // default constructor for efficient serialization public WriteMetadataEvent() { } @@ -140,6 +153,14 @@ public class WriteMetadataEvent implements OperatorEvent { this.lastBatch = lastBatch; } + public Long getMaxEventTime() { + return maxEventTime; + } + + public void setMaxEventTime(Long maxEventTime) { + this.maxEventTime = maxEventTime; + } + /** * Merges this event with given {@link WriteMetadataEvent} {@code other}. * @@ -172,6 +193,7 @@ public class WriteMetadataEvent implements OperatorEvent { + ", lastBatch=" + lastBatch + ", endInput=" + endInput + ", bootstrap=" + bootstrap + + ", maxEventTime=" + maxEventTime + '}'; } @@ -209,11 +231,13 @@ public class WriteMetadataEvent implements OperatorEvent { private boolean endInput = false; private boolean bootstrap = false; + private Long maxEventTime = Long.MAX_VALUE; + public WriteMetadataEvent build() { Objects.requireNonNull(taskID); Objects.requireNonNull(instantTime); Objects.requireNonNull(writeStatus); - return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap); + return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap, maxEventTime); } public Builder taskID(int taskID) { @@ -226,6 +250,11 @@ public class WriteMetadataEvent implements OperatorEvent { return this; } + public Builder maxEventTime(Long maxEventTime) { + this.maxEventTime = maxEventTime; + return this; + } + public Builder writeStatus(List<WriteStatus> writeStatus) { this.writeStatus = writeStatus; return this; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java index e91432b5e3..6c4bb9925c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; @@ -26,15 +28,41 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; +import javax.annotation.Nullable; import java.math.BigDecimal; +import java.time.DateTimeException; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.format.DateTimeParseException; +import java.time.temporal.TemporalAccessor; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; + +import static java.time.temporal.ChronoField.DAY_OF_MONTH; +import static java.time.temporal.ChronoField.HOUR_OF_DAY; +import static java.time.temporal.ChronoField.MINUTE_OF_HOUR; +import static java.time.temporal.ChronoField.MONTH_OF_YEAR; +import static java.time.temporal.ChronoField.NANO_OF_SECOND; +import static java.time.temporal.ChronoField.SECOND_OF_MINUTE; +import static java.time.temporal.ChronoField.YEAR; /** * Utilities for {@link org.apache.flink.table.types.DataType}. */ public class DataTypeUtils { + + private static final DateTimeFormatter DEFAULT_TIMESTAMP_FORMATTER = + new DateTimeFormatterBuilder() + .appendPattern("yyyy-[MM][M]-[dd][d]") + .optionalStart() + .appendPattern(" [HH][H]:[mm][m]:[ss][s]") + .appendFraction(NANO_OF_SECOND, 0, 9, true) + .optionalEnd() + .toFormatter(); + /** * Returns whether the given type is TIMESTAMP type. */ @@ -123,4 +151,117 @@ public class DataTypeUtils { "Can not convert %s to type %s for partition value", partition, type)); } } + + /** + * Ensures the give columns of the row data type are not nullable(for example, the primary keys). + * + * @param dataType The row data type, datatype logicaltype must be rowtype + * @param pkColumns The primary keys + * @return a new row data type if any column nullability is tweaked or the original data type + */ + public static DataType ensureColumnsAsNonNullable(DataType dataType, @Nullable List<String> pkColumns) { + if (pkColumns == null || pkColumns.isEmpty()) { + return dataType; + } + LogicalType dataTypeLogicalType = dataType.getLogicalType(); + if (!(dataTypeLogicalType instanceof RowType)) { + throw new RuntimeException("The datatype to be converted must be row type, but this type is :" + dataTypeLogicalType.getClass()); + } + RowType rowType = (RowType) dataTypeLogicalType; + List<DataType> originalFieldTypes = dataType.getChildren(); + List<String> fieldNames = rowType.getFieldNames(); + List<DataType> fieldTypes = new ArrayList<>(); + boolean tweaked = false; + for (int i = 0; i < fieldNames.size(); i++) { + if (pkColumns.contains(fieldNames.get(i)) && rowType.getTypeAt(i).isNullable()) { + fieldTypes.add(originalFieldTypes.get(i).notNull()); + tweaked = true; + } else { + fieldTypes.add(originalFieldTypes.get(i)); + } + } + if (!tweaked) { + return dataType; + } + List<DataTypes.Field> fields = new ArrayList<>(); + for (int i = 0; i < fieldNames.size(); i++) { + fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i))); + } + return DataTypes.ROW(fields.stream().toArray(DataTypes.Field[]::new)).notNull(); + } + + public static Long getAsLong(Object value, LogicalType logicalType) { + if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE + || logicalType.getTypeRoot() == LogicalTypeRoot.DATE + || logicalType.getTypeRoot() == LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE) { + return toMills(toLocalDateTime(value.toString())); + } + return Long.parseLong(value.toString()); + } + + public static LocalDateTime toLocalDateTime(String timestampString) { + try { + return parseTimestampData(timestampString); + } catch (DateTimeParseException e) { + return LocalDateTime.parse(timestampString); + } + } + + public static LocalDateTime parseTimestampData(String dateStr) throws DateTimeException { + // Precision is hardcoded to match signature of TO_TIMESTAMP + // https://issues.apache.org/jira/browse/FLINK-14925 + return parseTimestampData(dateStr, 3); + } + + public static LocalDateTime parseTimestampData(String dateStr, int precision) + throws DateTimeException { + return fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision); + } + + public static long toMills(LocalDateTime dateTime) { + return TimestampData.fromLocalDateTime(dateTime).getMillisecond(); + } + + private static LocalDateTime fromTemporalAccessor(TemporalAccessor accessor, int precision) { + // complement year with 1970 + int year = accessor.isSupported(YEAR) ? accessor.get(YEAR) : 1970; + // complement month with 1 + int month = accessor.isSupported(MONTH_OF_YEAR) ? accessor.get(MONTH_OF_YEAR) : 1; + // complement day with 1 + int day = accessor.isSupported(DAY_OF_MONTH) ? accessor.get(DAY_OF_MONTH) : 1; + // complement hour with 0 + int hour = accessor.isSupported(HOUR_OF_DAY) ? accessor.get(HOUR_OF_DAY) : 0; + // complement minute with 0 + int minute = accessor.isSupported(MINUTE_OF_HOUR) ? accessor.get(MINUTE_OF_HOUR) : 0; + // complement second with 0 + int second = accessor.isSupported(SECOND_OF_MINUTE) ? accessor.get(SECOND_OF_MINUTE) : 0; + // complement nano_of_second with 0 + int nanoOfSecond = accessor.isSupported(NANO_OF_SECOND) ? accessor.get(NANO_OF_SECOND) : 0; + + if (precision == 0) { + nanoOfSecond = 0; + } else if (precision != 9) { + nanoOfSecond = (int) floor(nanoOfSecond, powerX(10, 9 - precision)); + } + + return LocalDateTime.of(year, month, day, hour, minute, second, nanoOfSecond); + } + + private static long floor(long a, long b) { + long r = a % b; + if (r < 0) { + return a - r - b; + } else { + return a - r; + } + } + + private static long powerX(long a, long b) { + long x = 1; + while (b > 0) { + x *= a; + --b; + } + return x; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 193c0abcd8..9556a94083 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -18,14 +18,21 @@ package org.apache.hudi.sink; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsInference; import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; +import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.HoodiePipeline; import org.apache.hudi.util.StreamerUtil; @@ -67,6 +74,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; /** * Integration test for Flink Hoodie stream sink. @@ -109,6 +119,31 @@ public class ITTestDataStreamWrite extends TestLogger { testWriteToHoodie(conf, "cow_write", 2, EXPECTED); } + @ParameterizedTest + @ValueSource(strings = {"BUCKET", "FLINK_STATE"}) + public void testWriteCopyOnWriteWithEventTimeExtract(String indexType) throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); + conf.setString(FlinkOptions.INDEX_TYPE, indexType); + conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1); + conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); + conf.setString(FlinkOptions.EVENT_TIME_FIELD, "ts"); + + testWriteToHoodie(conf, "cow_write", 1, EXPECTED); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(conf.getString(FlinkOptions.PATH)); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(conf.getString(FlinkOptions.PATH)).build(); + HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); + List<HoodieInstant> hoodieInstants = table.getFileSystemView().getTimeline().getInstants().collect(Collectors.toList()); + assertEquals(1, hoodieInstants.size()); + byte[] data = table.getFileSystemView().getTimeline().getInstantDetails(hoodieInstants.get(0)).get(); + Map<String, String> extraMetadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class).getExtraMetadata(); + if (indexType.equals("BUCKET")) { + assertEquals("2000", extraMetadata.get(FlinkOptions.EVENT_TIME_FIELD.key())); + } else { + assertEquals("4000", extraMetadata.get(FlinkOptions.EVENT_TIME_FIELD.key())); + } + } + @Test public void testWriteCopyOnWriteWithTransformer() throws Exception { Transformer transformer = (ds) -> ds.map((rowdata) -> { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteFunctionEventTimeExtract.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteFunctionEventTimeExtract.java new file mode 100644 index 0000000000..52ba4e07b9 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteFunctionEventTimeExtract.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.DateTimeUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.HoodiePipeline; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.source.ContinuousFileSource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class TestWriteFunctionEventTimeExtract extends TestLogger { + + @TempDir + File tempFile; + + public static final DataType ROW_DATA_TYPE = DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("data", DataTypes.VARCHAR(10)), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + .notNull(); + + private static Stream<Arguments> parameters() { + return Stream.of( + Arguments.of("COPY_ON_WRITE", "FLINK_STATE", 1, false), + Arguments.of("COPY_ON_WRITE", "FLINK_STATE", 2, true), + Arguments.of("MERGE_ON_READ", "FLINK_STATE", 1, false), + Arguments.of("MERGE_ON_READ", "FLINK_STATE", 2, true), + Arguments.of("COPY_ON_WRITE", "BUCKET", 1, false), + Arguments.of("COPY_ON_WRITE", "BUCKET", 2, true), + Arguments.of("MERGE_ON_READ", "BUCKET", 1, false), + Arguments.of("MERGE_ON_READ", "BUCKET", 2, true), + Arguments.of("MERGE_ON_READ", "NON_INDEX", 1, false), + Arguments.of("MERGE_ON_READ", "NON_INDEX", 2, true)); + } + + @ParameterizedTest + @MethodSource("parameters") + void testWriteWithEventTime(String tableType, String indexType, int parallelism, boolean partitioned) throws Exception { + Configuration conf = getConf(tableType, indexType, parallelism); + + List<Row> rows = + Lists.newArrayList( + Row.of("1", "hello", LocalDateTime.parse("2012-12-12T12:12:12")), + Row.of("2", "world", LocalDateTime.parse("2012-12-12T12:12:01")), + Row.of("3", "world", LocalDateTime.parse("2012-12-12T12:12:02")), + Row.of("4", "foo", LocalDateTime.parse("2012-12-12T12:12:10"))); + + // write rowData with eventTime + testWriteToHoodie(conf, parallelism, partitioned, ((RowType) ROW_DATA_TYPE.getLogicalType()), rows); + + // check eventTime + checkWriteEventTime(indexType, parallelism, conf); + } + + public Configuration getConf(String tableType, String indexType, int parallelism) { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), ROW_DATA_TYPE); + conf.setString(FlinkOptions.TABLE_TYPE, tableType); + conf.setString(FlinkOptions.INDEX_TYPE, indexType); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, "id"); + conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id"); + conf.setString(FlinkOptions.EVENT_TIME_FIELD, "ts"); + conf.setInteger(FlinkOptions.WRITE_TASKS, parallelism); + conf.setString("hoodie.metrics.on", "false"); + conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, parallelism); + return conf; + } + + private void checkWriteEventTime(String indexType, int parallelism, Configuration conf) throws IOException { + if (parallelism <= 1) { // single mode + Assertions.assertEquals(DateTimeUtils.millisFromTimestamp(LocalDateTime.parse("2012-12-12T12:12:12")), + getLastEventTime(conf)); + } else if (indexType.equals("BUCKET") || indexType.equals("NON_INDEX")) { // hash mode + Assertions.assertEquals(DateTimeUtils.millisFromTimestamp(LocalDateTime.parse("2012-12-12T12:12:10")), + getLastEventTime(conf)); + } else { // partition mode + Assertions.assertEquals(DateTimeUtils.millisFromTimestamp(LocalDateTime.parse("2012-12-12T12:12:02")), + getLastEventTime(conf)); + } + } + + public long getLastEventTime(Configuration conf) throws IOException { + String path = conf.getString(FlinkOptions.PATH); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(path); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + HoodieInstant hoodieInstant = timeline.getReverseOrderedInstants() + .findFirst().orElse(null); + if (hoodieInstant == null) { + return -1L; + } else { + Option<String> eventtime = TimelineUtils.getMetadataValue(metaClient, FlinkOptions.EVENT_TIME_FIELD.key(), + hoodieInstant); + return Long.parseLong(eventtime.orElseGet(() -> "-1")); + } + } + + public List<String> getRowDataString(List<Row> rows, boolean partitioned) { + List<String> dataBuffer = new ArrayList<>(); + for (Row row : rows) { + String id = (String) row.getField(0); + String data = (String) row.getField(1); + LocalDateTime ts = (LocalDateTime) row.getField(2); + String rowData = String.format("{\"id\": \"%s\", \"data\": \"%s\", \"ts\": \"%s\", \"partition\": \"%s\"}", + id, data, ts.toString(), partitioned ? data : "par"); + dataBuffer.add(rowData); + } + return dataBuffer; + } + + private void testWriteToHoodie( + Configuration conf, + int parallelism, + boolean partitioned, + RowType rowType, + List<Row> rows) throws Exception { + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(parallelism); + execEnv.setMaxParallelism(parallelism); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + + boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name()); + + List<String> dataBuffer = getRowDataString(rows, partitioned); + + DataStream<RowData> dataStream; + dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(null, dataBuffer, 1)) + .name("continuous_file_source") + .setParallelism(1) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(1); + + + //sink to hoodie table use low-level sink api. + HoodiePipeline.Builder builder = HoodiePipeline.builder("t_event_sink") + .column("id string not null") + .column("data string") + .column("`ts` timestamp(3)") + .column("`partition` string") + .pk("id") + .partition("partition") + .options(conf.toMap()); + + builder.sink(dataStream, false); + execute(execEnv, isMor, "EventTime_Sink_Test"); + } + + public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception { + if (isMor) { + JobClient client = execEnv.executeAsync(jobName); + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + } else { + // wait for the streaming job to finish + execEnv.execute(jobName); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index b83f3cc478..d31a884827 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -171,8 +171,11 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { } else { bucketAssignerFunction.processElement(hoodieRecord, null, collector); bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey()); - writeFunction.processElement(collector.getVal(), null, null); + if (writeFunction.extractTimeStampEnable()) { + writeFunction.extractTimestamp(collector.getVal()); + } } + writeFunction.processElement(collector.getVal(), null, null); } public WriteMetadataEvent[] getEventBuffer() { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestDataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestDataTypeUtils.java new file mode 100644 index 0000000000..cda252ca5d --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestDataTypeUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.hudi.util.DataTypeUtils; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestDataTypeUtils { + @Test + public void testGetAsLong() { + long t1 = DataTypeUtils.getAsLong("2012-12-12T12:12:12", DataTypes.TIMESTAMP(3).getLogicalType()); + assertEquals(1355314332000L, t1); + + long t2 = DataTypeUtils.getAsLong("2012-12-12 12:12:12", DataTypes.TIME().getLogicalType()); + assertEquals(1355314332000L, t2); + + long t3 = DataTypeUtils.getAsLong("2012-12-12", DataTypes.DATE().getLogicalType()); + assertEquals(1355270400000L, t3); + + long t4 = DataTypeUtils.getAsLong(100L, DataTypes.BIGINT().getLogicalType()); + assertEquals(100L, t4); + + long t5 = DataTypeUtils.getAsLong(100, DataTypes.INT().getLogicalType()); + assertEquals(100, t5); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index 2830eefef0..1715593077 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -133,6 +133,11 @@ public class ContinuousFileSource implements ScanTableSource { this.checkpoints = checkpoints; } + public BoundedSourceFunction(Path path, List<String> dataBuffer, int checkpoints) { + this(path, checkpoints); + this.dataBuffer = dataBuffer; + } + @Override public void run(SourceContext<String> context) throws Exception { if (this.dataBuffer == null) {