This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 642f87cc6b [HUDI-4601] Read error from MOR table after compaction with 
timestamp partitioning (#6365)
642f87cc6b is described below

commit 642f87cc6b6b2971911d2f27619ee6e6f02e76a4
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Wed Aug 17 09:49:41 2022 +0800

    [HUDI-4601] Read error from MOR table after compaction with timestamp 
partitioning (#6365)
    
    * read error from mor after compaction
    
    Co-authored-by: 吴文池 <wuwen...@deepexi.com>
---
 .../table/format/cow/CopyOnWriteInputFormat.java   | 11 +++++---
 .../table/format/mor/MergeOnReadInputFormat.java   | 11 +++++---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 29 +++++++++++++++++-----
 3 files changed, 39 insertions(+), 12 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 719669b532..f04c23fe91 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -108,9 +108,14 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
     LinkedHashMap<String, String> partSpec = 
PartitionPathUtils.extractPartitionSpecFromPath(
         fileSplit.getPath());
     LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
-    partSpec.forEach((k, v) -> partObjects.put(k, 
DataTypeUtils.resolvePartition(
-        partDefaultName.equals(v) ? null : v,
-        fullFieldTypes[fieldNameList.indexOf(k)])));
+    partSpec.forEach((k, v) -> {
+      DataType fieldType = fullFieldTypes[fieldNameList.indexOf(k)];
+      if (!DataTypeUtils.isDatetimeType(fieldType)) {
+        // date time type partition field is formatted specifically,
+        // read directly from the data file to avoid format mismatch or 
precision loss
+        partObjects.put(k, 
DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v, 
fieldType));
+      }
+    });
 
     this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
         utcTimestamp,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 3ca04986fe..61cf52386b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -299,9 +299,14 @@ public class MergeOnReadInputFormat
         this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
         FilePathUtils.extractPartitionKeys(this.conf));
     LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
-    partSpec.forEach((k, v) -> partObjects.put(k, 
DataTypeUtils.resolvePartition(
-        defaultPartName.equals(v) ? null : v,
-        fieldTypes.get(fieldNames.indexOf(k)))));
+    partSpec.forEach((k, v) -> {
+      DataType fieldType = fieldTypes.get(fieldNames.indexOf(k));
+      if (!DataTypeUtils.isDatetimeType(fieldType)) {
+        // date time type partition field is formatted specifically,
+        // read directly from the data file to avoid format mismatch or 
precision loss
+        partObjects.put(k, 
DataTypeUtils.resolvePartition(defaultPartName.equals(v) ? null : v, 
fieldType));
+      }
+    });
 
     return ParquetSplitReaderUtil.genPartColumnarRowReader(
         this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index afe3e809b0..bf3abf74b8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1041,15 +1041,12 @@ public class ITTestHoodieDataSource extends 
AbstractTestBase {
   }
 
   @ParameterizedTest
-  @EnumSource(value = ExecMode.class)
-  void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
-    // can not read the hive style and timestamp based partitioning table
-    // in batch mode, the code path in CopyOnWriteInputFormat relies on
-    // the value on the partition path to recover the partition value,
-    // but the date format has changed(milliseconds switch to hours).
+  @MethodSource("executionModeAndPartitioningParams")
+  void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean 
hiveStylePartitioning) {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : 
streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
         .partitionField("ts") // use timestamp as partition path field
         .end();
     tableEnv.executeSql(hoodieTableDDL);
@@ -1068,6 +1065,26 @@ public class ITTestHoodieDataSource extends 
AbstractTestBase {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @Test
+  void testMergeOnReadCompactionWithTimestampPartitioning() {
+    TableEnvironment tableEnv = batchTableEnv;
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
+        .option(FlinkOptions.COMPACTION_TASKS, 1)
+        .partitionField("ts")
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    List<Row> rows = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, 
FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
   void testWriteAndReadWithDatePartitioning(String partitionFormat) {

Reply via email to