This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.14.1-fix-timestamp-millis in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e6ee3fe24be262693f05e65dfcb2d096ddded9d9 Author: Y Ethan Guo <[email protected]> AuthorDate: Wed Mar 12 07:31:49 2025 -0700 WIP --- .../hudi/testutils/HoodieClientTestUtils.java | 10 +++ .../scala/org/apache/hudi/HoodieBaseRelation.scala | 9 ++- .../parquet/HoodieParquetFileFormatHelper.scala | 4 +- .../apache/hudi/functional/TestCOWDataSource.scala | 57 +++++++++++++++ .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 83 ++++++++++++++++++++++ .../deltastreamer/TestHoodieDeltaStreamer.java | 26 +++++++ 6 files changed, 187 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index a7808ea93824..ddbf3428dede 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -97,6 +97,16 @@ public class HoodieClientTestUtils { .setMaster("local[4]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + //.set("spark.sql.avro.datetimeRebaseModeInRead", "LEGACY") + //.set("spark.sql.avro.datetimeRebaseModeInWrite", "LEGACY") + //.set("spark.sql.legacy.avro.datetimeRebaseModeInWrite", "LEGACY") + //.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") + //.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") + //.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") + //.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY") + //.set("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY") + //.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") + //.set("spark.sql.parquet.writeLegacyFormat", "true") .set("spark.sql.shuffle.partitions", "4") .set("spark.default.parallelism", "4"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 0098ee54c2bc..364bb2f8e5db 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -180,7 +180,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } } - (avroSchema, internalSchemaOpt) + if (avroSchema.getName.equals("HudiRecord")) { + val newSchema = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"HudiRecord\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_record_key\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_partition_path\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null},{\"name\":\"_hoodie_file_name\",\"type\ [...] + ) + (newSchema, internalSchemaOpt) + } else { + (avroSchema, internalSchemaOpt) + } } protected lazy val tableStructSchema: StructType = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala index 599bbebe4f6c..6493ce0e3083 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.metadata.FileMetaData -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructField, StructType, TimestampType} object HoodieParquetFileFormatHelper { @@ -45,6 +45,8 @@ object HoodieParquetFileFormatHelper { } def isDataTypeEqual(requiredType: DataType, fileType: DataType): Boolean = (requiredType, fileType) match { + case (LongType, TimestampType) => true + case (requiredType, fileType) if requiredType == fileType => true case (ArrayType(rt, _), ArrayType(ft, _)) => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index f500ea83120d..06892a95086a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -114,6 +114,63 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + @Test + def testWriteTimestampLogicalType(): Unit = { + val avroSchemaStr = + """ + |{ + | "type": "record", + | "name": "HudiRecord", + | "fields": [ + | {"name": "id", "type": "int"}, + | {"name": "name", "type": ["null", "string"]}, + | {"name": "ts_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + | {"name": "value", "type": ["null", "double"]}, + | {"name": "category", "type": ["null", "string"]} + | ] + |} + |""".stripMargin + spark.sql("DROP TABLE IF EXISTS source").show(false) + spark.sql( + s""" + |CREATE TABLE IF NOT EXISTS source ( + | id INT, + | name STRING, + | ts_millis BIGINT, + | value DOUBLE, + | category STRING + |) + |USING parquet + |""".stripMargin).show(false) + spark.sql( + """ + |INSERT INTO source values + | (1, "Alice", 1678886400000L, 10.5, "A"), + | (2, "Bob", 1678886401000L, 20.2, "B"), + | (3, "Charlie", 1678886402000L, 15.8, "A"), + | (4, "David", 1678886403000L, 25.9, "C"), + | (5, "Eve", 1678886404000L, 12.1, "B") + |""".stripMargin).show(false) + val df = spark.sql("select id, name, ts_millis, value, category from source") + //spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS") + //spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") + df.write.format("hudi") + .option("hoodie.table.name", "test_table") + .option("hoodie.write.schema", avroSchemaStr) + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.precombine.field", "ts_millis") + .mode(SaveMode.Overwrite) + .save("/tmp/hudi_table_test3") + df.printSchema() + } + + @Test + def testReadTimestampLogicalType(): Unit = { + spark.read.format("hudi") + .load("/tmp/hudi_table_test3") + .show(false) + } + @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testNoPrecombine(recordType: HoodieRecordType) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 63adacbf1292..7e4216cf903d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -1236,4 +1236,87 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo }) } } + + test("Test MergeInto with changing partition and global index") { + Seq(true).foreach { updatePartitionPathEnabled => + withTempDir { tmp => + withSQLConf("hoodie.index.type" -> "GLOBAL_SIMPLE", + "hoodie.simple.index.update.partition.path" -> updatePartitionPathEnabled.toString) { + Seq("mor").foreach { tableType => + val targetTable = generateTableName + spark.sql( + s""" + | create table $targetTable ( + | id int, + | version int, + | mergeCond string, + | partition string + | ) using hudi + | partitioned by (partition) + | tblproperties ( + | 'primaryKey' = 'id', + | 'type' = '$tableType', + | 'payloadClass' = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload', + | 'payloadType' = 'CUSTOM', + | preCombineField = 'version' + | ) + | location '${tmp.getCanonicalPath}/$targetTable' + """.stripMargin) + + spark.sql(s"insert into $targetTable values(1, 1, 'insert', '2023-10-01')") + spark.sql(s"insert into $targetTable values(2, 1, 'insert', '2023-10-01')") + spark.sql(s"insert into $targetTable values(3, 1, 'insert', '2023-10-01')") + spark.sql(s"insert into $targetTable values(4, 1, 'insert', '2023-10-01')") + spark.sql(s"insert into $targetTable values(5, 1, 'insert', '2023-10-01')") + spark.sql(s"insert into $targetTable values(6, 1, 'insert', '2023-10-01')") + + val sourceTable = generateTableName + spark.sql( + s""" + | create table $sourceTable ( + | id int, + | version int, + | mergeCond string, + | partition string + | ) using parquet + | partitioned by (partition) + | location '${tmp.getCanonicalPath}/$sourceTable' + """.stripMargin) + + //merge cond matches and partition is changed + spark.sql(s"insert into $sourceTable values(1, 2, 'yes', '2023-10-02')") + //merge cond does not match and partition is changed + spark.sql(s"insert into $sourceTable values(2, 2, 'no', '2023-10-02')") + //merge cond matches and partition is not changed + spark.sql(s"insert into $sourceTable values(3, 2, 'yes', '2023-10-01')") + //merge cond does not match and partition is not changed + spark.sql(s"insert into $sourceTable values(4, 2, 'no', '2023-10-01')") + //merge cond is delete and partition is changed + spark.sql(s"insert into $sourceTable values(5, 2, 'delete', '2023-10-02')") + //merge cond is delete and partition is not changed + spark.sql(s"insert into $sourceTable values(6, 2, 'delete', '2023-10-01')") + //id does not match + spark.sql(s"insert into $sourceTable values(7, 1, 'insert', '2023-10-01')") + + spark.sql( + s""" + | merge into $targetTable t using + | (select * from $sourceTable) as s + | on t.id=s.id + | when matched and s.mergeCond = 'yes' then update set * + | when matched and s.mergeCond = 'delete' then delete + | when not matched then insert * + """.stripMargin) + val updatedPartitionPath = if (updatePartitionPathEnabled) "partition=2023-10-02" else "partition=2023-10-01" + checkAnswer(s"select id,version,_hoodie_partition_path from $targetTable order by id")( + Seq(1, 2, updatedPartitionPath), + Seq(2, 1, "partition=2023-10-01"), + Seq(3, 2, "partition=2023-10-01"), + Seq(4, 1, "partition=2023-10-01"), + Seq(7, 1, "partition=2023-10-01")) + } + } + } + } + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 60ed1b6732a5..a3a56a80360b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -457,6 +457,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertEquals(configFlag, Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning())); } + @Test + public void testBootstrap() throws IOException { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + String newDatasetBasePath = "/Users/ethan/Work/tmp/20250311-orc-bootstrap/tmp2/hudi_table"; + cfg.runBootstrap = true; + cfg.tableType = "COPY_ON_WRITE"; + cfg.targetTableName = "bootstrap_table"; + cfg.baseFileFormat = "orc"; + cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", "/Users/ethan/Work/tmp/20250311-orc-bootstrap/tmp2/orc_input")); + //cfg.configs.add("hoodie.datasource.write.partitionpath.field="); + cfg.configs.add("hoodie.datasource.write.recordkey.field=uuid"); + cfg.configs.add("hoodie.datasource.write.precombine.field=ts"); + cfg.configs.add("hoodie.datasource.write.partitionpath.field=city"); + cfg.configs.add(String.format("hoodie.datasource.write.keygenerator.class=%s", SimpleKeyGenerator.class.getName())); + cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true"); + cfg.configs.add("hoodie.bootstrap.mode.selector.regex.mode=METADATA_ONLY"); + cfg.configs.add("hoodie.table.base.file.format=orc"); + cfg.configs.add("hoodie.bootstrap.parallelism=5"); + cfg.targetBasePath = newDatasetBasePath; + try { + new HoodieDeltaStreamer(cfg, jsc).sync(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @ParameterizedTest @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType recordType) throws Exception {
