This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.14.1-fix-timestamp-millis
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e6ee3fe24be262693f05e65dfcb2d096ddded9d9
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Mar 12 07:31:49 2025 -0700

    WIP
---
 .../hudi/testutils/HoodieClientTestUtils.java      | 10 +++
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  9 ++-
 .../parquet/HoodieParquetFileFormatHelper.scala    |  4 +-
 .../apache/hudi/functional/TestCOWDataSource.scala | 57 +++++++++++++++
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 83 ++++++++++++++++++++++
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 26 +++++++
 6 files changed, 187 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index a7808ea93824..ddbf3428dede 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -97,6 +97,16 @@ public class HoodieClientTestUtils {
         .setMaster("local[4]")
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         .set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
+        //.set("spark.sql.avro.datetimeRebaseModeInRead", "LEGACY")
+        //.set("spark.sql.avro.datetimeRebaseModeInWrite", "LEGACY")
+        //.set("spark.sql.legacy.avro.datetimeRebaseModeInWrite", "LEGACY")
+        //.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")
+        //.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
+        //.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY")
+        //.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")
+        //.set("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY")
+        //.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
+        //.set("spark.sql.parquet.writeLegacyFormat", "true")
         .set("spark.sql.shuffle.partitions", "4")
         .set("spark.default.parallelism", "4");
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 0098ee54c2bc..364bb2f8e5db 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -180,7 +180,14 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       }
     }
 
-    (avroSchema, internalSchemaOpt)
+    if (avroSchema.getName.equals("HudiRecord")) {
+      val newSchema = new Schema.Parser().parse(
+        
"{\"type\":\"record\",\"name\":\"HudiRecord\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_file_name\",\"type\
 [...]
+      )
+      (newSchema, internalSchemaOpt)
+    } else {
+      (avroSchema, internalSchemaOpt)
+    }
   }
 
   protected lazy val tableStructSchema: StructType = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
index 599bbebe4f6c..6493ce0e3083 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.hadoop.metadata.FileMetaData
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, 
StructField, StructType, TimestampType}
 
 object HoodieParquetFileFormatHelper {
 
@@ -45,6 +45,8 @@ object HoodieParquetFileFormatHelper {
   }
 
   def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = 
(requiredType, fileType) match {
+    case (LongType, TimestampType) => true
+
     case (requiredType, fileType) if requiredType == fileType => true
 
     case (ArrayType(rt, _), ArrayType(ft, _)) =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index f500ea83120d..06892a95086a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -114,6 +114,63 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
 
+  @Test
+  def testWriteTimestampLogicalType(): Unit = {
+    val avroSchemaStr =
+      """
+        |{
+        |  "type": "record",
+        |  "name": "HudiRecord",
+        |  "fields": [
+        |    {"name": "id", "type": "int"},
+        |    {"name": "name", "type": ["null", "string"]},
+        |    {"name": "ts_millis", "type": {"type": "long", "logicalType": 
"timestamp-millis"}},
+        |    {"name": "value", "type": ["null", "double"]},
+        |    {"name": "category", "type": ["null", "string"]}
+        |  ]
+        |}
+        |""".stripMargin
+    spark.sql("DROP TABLE IF EXISTS source").show(false)
+    spark.sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS source (
+         |  id INT,
+         |  name STRING,
+         |  ts_millis BIGINT,
+         |  value DOUBLE,
+         |  category STRING
+         |)
+         |USING parquet
+         |""".stripMargin).show(false)
+    spark.sql(
+      """
+        |INSERT INTO source values
+        | (1, "Alice", 1678886400000L, 10.5, "A"),
+        | (2, "Bob", 1678886401000L, 20.2, "B"),
+        | (3, "Charlie", 1678886402000L, 15.8, "A"),
+        | (4, "David", 1678886403000L, 25.9, "C"),
+        | (5, "Eve", 1678886404000L, 12.1, "B")
+        |""".stripMargin).show(false)
+    val df = spark.sql("select id, name, ts_millis, value, category from 
source")
+    //spark.conf.set("spark.sql.parquet.outputTimestampType", 
"TIMESTAMP_MILLIS")
+    //spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", 
"LEGACY")
+    df.write.format("hudi")
+      .option("hoodie.table.name", "test_table")
+      .option("hoodie.write.schema", avroSchemaStr)
+      .option("hoodie.datasource.write.recordkey.field", "id")
+      .option("hoodie.datasource.write.precombine.field", "ts_millis")
+      .mode(SaveMode.Overwrite)
+      .save("/tmp/hudi_table_test3")
+    df.printSchema()
+  }
+
+  @Test
+  def testReadTimestampLogicalType(): Unit = {
+    spark.read.format("hudi")
+      .load("/tmp/hudi_table_test3")
+      .show(false)
+  }
+
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
   def testNoPrecombine(recordType: HoodieRecordType) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 63adacbf1292..7e4216cf903d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -1236,4 +1236,87 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
       })
     }
   }
+
+  test("Test MergeInto with changing partition and global index") {
+    Seq(true).foreach { updatePartitionPathEnabled =>
+      withTempDir { tmp =>
+        withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE",
+          "hoodie.simple.index.update.partition.path" -> 
updatePartitionPathEnabled.toString) {
+          Seq("mor").foreach { tableType =>
+            val targetTable = generateTableName
+            spark.sql(
+              s"""
+                 | create table $targetTable (
+                 |  id int,
+                 |  version int,
+                 |  mergeCond string,
+                 |  partition string
+                 | ) using hudi
+                 | partitioned by (partition)
+                 | tblproperties (
+                 |    'primaryKey' = 'id',
+                 |    'type' = '$tableType',
+                 |    'payloadClass' = 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
+                 |    'payloadType' = 'CUSTOM',
+                 |    preCombineField = 'version'
+                 | )
+                 | location '${tmp.getCanonicalPath}/$targetTable'
+             """.stripMargin)
+
+            spark.sql(s"insert into $targetTable values(1, 1, 'insert', 
'2023-10-01')")
+            spark.sql(s"insert into $targetTable values(2, 1, 'insert', 
'2023-10-01')")
+            spark.sql(s"insert into $targetTable values(3, 1, 'insert', 
'2023-10-01')")
+            spark.sql(s"insert into $targetTable values(4, 1, 'insert', 
'2023-10-01')")
+            spark.sql(s"insert into $targetTable values(5, 1, 'insert', 
'2023-10-01')")
+            spark.sql(s"insert into $targetTable values(6, 1, 'insert', 
'2023-10-01')")
+
+            val sourceTable = generateTableName
+            spark.sql(
+              s"""
+                 | create table $sourceTable (
+                 | id int,
+                 | version int,
+                 | mergeCond string,
+                 | partition string
+                 | ) using parquet
+                 | partitioned by (partition)
+                 | location '${tmp.getCanonicalPath}/$sourceTable'
+            """.stripMargin)
+
+            //merge cond matches and partition is changed
+            spark.sql(s"insert into $sourceTable values(1, 2, 'yes', 
'2023-10-02')")
+            //merge cond does not match and partition is changed
+            spark.sql(s"insert into $sourceTable values(2, 2, 'no', 
'2023-10-02')")
+            //merge cond matches and partition is not changed
+            spark.sql(s"insert into $sourceTable values(3, 2, 'yes', 
'2023-10-01')")
+            //merge cond does not match and partition is not changed
+            spark.sql(s"insert into $sourceTable values(4, 2, 'no', 
'2023-10-01')")
+            //merge cond is delete and partition is changed
+            spark.sql(s"insert into $sourceTable values(5, 2, 'delete', 
'2023-10-02')")
+            //merge cond is delete and partition is not changed
+            spark.sql(s"insert into $sourceTable values(6, 2, 'delete', 
'2023-10-01')")
+            //id does not match
+            spark.sql(s"insert into $sourceTable values(7, 1, 'insert', 
'2023-10-01')")
+
+            spark.sql(
+              s"""
+                 | merge into $targetTable t using
+                 | (select * from $sourceTable) as s
+                 | on t.id=s.id
+                 | when matched and s.mergeCond = 'yes' then update set *
+                 | when matched and s.mergeCond = 'delete' then delete
+                 | when not matched then insert *
+             """.stripMargin)
+            val updatedPartitionPath = if (updatePartitionPathEnabled) 
"partition=2023-10-02" else "partition=2023-10-01"
+            checkAnswer(s"select id,version,_hoodie_partition_path from 
$targetTable order by id")(
+              Seq(1, 2, updatedPartitionPath),
+              Seq(2, 1, "partition=2023-10-01"),
+              Seq(3, 2, "partition=2023-10-01"),
+              Seq(4, 1, "partition=2023-10-01"),
+              Seq(7, 1, "partition=2023-10-01"))
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 60ed1b6732a5..a3a56a80360b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -457,6 +457,32 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertEquals(configFlag, 
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
   }
 
+  @Test
+  public void testBootstrap() throws IOException {
+    HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+    String newDatasetBasePath = 
"/Users/ethan/Work/tmp/20250311-orc-bootstrap/tmp2/hudi_table";
+    cfg.runBootstrap = true;
+    cfg.tableType = "COPY_ON_WRITE";
+    cfg.targetTableName = "bootstrap_table";
+    cfg.baseFileFormat = "orc";
+    cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", 
"/Users/ethan/Work/tmp/20250311-orc-bootstrap/tmp2/orc_input"));
+    //cfg.configs.add("hoodie.datasource.write.partitionpath.field=");
+    cfg.configs.add("hoodie.datasource.write.recordkey.field=uuid");
+    cfg.configs.add("hoodie.datasource.write.precombine.field=ts");
+    cfg.configs.add("hoodie.datasource.write.partitionpath.field=city");
+    
cfg.configs.add(String.format("hoodie.datasource.write.keygenerator.class=%s", 
SimpleKeyGenerator.class.getName()));
+    cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
+    cfg.configs.add("hoodie.bootstrap.mode.selector.regex.mode=METADATA_ONLY");
+    cfg.configs.add("hoodie.table.base.file.format=orc");
+    cfg.configs.add("hoodie.bootstrap.parallelism=5");
+    cfg.targetBasePath = newDatasetBasePath;
+    try {
+      new HoodieDeltaStreamer(cfg, jsc).sync();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"})
   public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType 
recordType) throws Exception {

Reply via email to