prashantwason commented on code in PR #17776:
URL: https://github.com/apache/hudi/pull/17776#discussion_r2770799347
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -2259,6 +2259,74 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
}
+
+ /**
+ * Test that incremental reads work on MOR tables when the data schema
contains fields
+ * with the same name as Hudi meta fields (e.g., _hoodie_partition_path).
This tests
+ * the fix that filters out duplicate fields when merging skeleton schema
with data
+ * schema in IncrementalRelation.
+ *
+ * Without the fix, this would fail with:
+ * org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the
data schema
+ */
+ @Test
+ def testIncrementalReadWithDuplicateMetaFieldInDataSchema(): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ // Create a DataFrame with a column that has the same name as a Hudi meta
field
+ val df = Seq(
+ ("row1", "partition1", 1000L, "value1"),
+ ("row2", "partition1", 1001L, "value2"),
+ ("row3", "partition2", 1002L, "value3")
+ ).toDF("_row_key", "_hoodie_partition_path", "timestamp", "data")
+
+ val writeOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key ->
"_hoodie_partition_path",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_mor_dup_meta_field"
+ )
+
+ // Write initial data
+ df.write.format("hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
Review Comment:
Done. Added an upsert operation after the initial insert to create log files
in the MOR table. The test now includes:
- Initial insert (creates base files)
- Upsert (creates log files with an update to row1 and a new row4)
- Incremental read that verifies both base files and log files are correctly
read
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]