This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 642f87cc6b [HUDI-4601] Read error from MOR table after compaction with timestamp partitioning (#6365) 642f87cc6b is described below commit 642f87cc6b6b2971911d2f27619ee6e6f02e76a4 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Wed Aug 17 09:49:41 2022 +0800 [HUDI-4601] Read error from MOR table after compaction with timestamp partitioning (#6365) * read error from mor after compaction Co-authored-by: 吴文池 <wuwen...@deepexi.com> --- .../table/format/cow/CopyOnWriteInputFormat.java | 11 +++++--- .../table/format/mor/MergeOnReadInputFormat.java | 11 +++++--- .../apache/hudi/table/ITTestHoodieDataSource.java | 29 +++++++++++++++++----- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index 719669b532..f04c23fe91 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -108,9 +108,14 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath( fileSplit.getPath()); LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>(); - partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition( - partDefaultName.equals(v) ? null : v, - fullFieldTypes[fieldNameList.indexOf(k)]))); + partSpec.forEach((k, v) -> { + DataType fieldType = fullFieldTypes[fieldNameList.indexOf(k)]; + if (!DataTypeUtils.isDatetimeType(fieldType)) { + // date time type partition field is formatted specifically, + // read directly from the data file to avoid format mismatch or precision loss + partObjects.put(k, DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v, fieldType)); + } + }); this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader( utcTimestamp, diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 3ca04986fe..61cf52386b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -299,9 +299,14 @@ public class MergeOnReadInputFormat this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), FilePathUtils.extractPartitionKeys(this.conf)); LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>(); - partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition( - defaultPartName.equals(v) ? null : v, - fieldTypes.get(fieldNames.indexOf(k))))); + partSpec.forEach((k, v) -> { + DataType fieldType = fieldTypes.get(fieldNames.indexOf(k)); + if (!DataTypeUtils.isDatetimeType(fieldType)) { + // date time type partition field is formatted specifically, + // read directly from the data file to avoid format mismatch or precision loss + partObjects.put(k, DataTypeUtils.resolvePartition(defaultPartName.equals(v) ? null : v, fieldType)); + } + }); return ParquetSplitReaderUtil.genPartColumnarRowReader( this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index afe3e809b0..bf3abf74b8 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1041,15 +1041,12 @@ public class ITTestHoodieDataSource extends AbstractTestBase { } @ParameterizedTest - @EnumSource(value = ExecMode.class) - void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) { - // can not read the hive style and timestamp based partitioning table - // in batch mode, the code path in CopyOnWriteInputFormat relies on - // the value on the partition path to recover the partition value, - // but the date format has changed(milliseconds switch to hours). + @MethodSource("executionModeAndPartitioningParams") + void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean hiveStylePartitioning) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) .partitionField("ts") // use timestamp as partition path field .end(); tableEnv.executeSql(hoodieTableDDL); @@ -1068,6 +1065,26 @@ public class ITTestHoodieDataSource extends AbstractTestBase { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @Test + void testMergeOnReadCompactionWithTimestampPartitioning() { + TableEnvironment tableEnv = batchTableEnv; + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) + .option(FlinkOptions.COMPACTION_TASKS, 1) + .partitionField("ts") + .end(); + tableEnv.executeSql(hoodieTableDDL); + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + List<Row> rows = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } + @ParameterizedTest @ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY}) void testWriteAndReadWithDatePartitioning(String partitionFormat) {