This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new cc95b03b5ef [HUDI-7002] Fixing initializing RLI MDT partition for non-partitioned dataset (#9938) cc95b03b5ef is described below commit cc95b03b5ef69ee585e0f781b6ad29cf5e770469 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Fri Nov 3 10:21:04 2023 -0400 [HUDI-7002] Fixing initializing RLI MDT partition for non-partitioned dataset (#9938) --- .../hudi/metadata/HoodieTableMetadataUtil.java | 2 +- .../hudi/functional/TestRecordLevelIndex.scala | 36 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 6502f8c246d..6808a0ef8dc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1738,7 +1738,7 @@ public class HoodieTableMetadataUtil { final String partition = partitionAndBaseFile.getKey(); final HoodieBaseFile baseFile = partitionAndBaseFile.getValue(); final String filename = baseFile.getFileName(); - Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + filename); + Path dataFilePath = new Path(basePath, StringUtils.isNullOrEmpty(partition) ? filename : (partition + Path.SEPARATOR) + filename); final String fileId = baseFile.getFileId(); final String instantTime = baseFile.getCommitTime(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index 17e3cadeeff..56866e7bf40 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -26,13 +26,15 @@ import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.config._ import org.apache.hudi.exception.HoodieWriteConflictException +import org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL, SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME, SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL, SQL_RIDER_IS_NULL} import org.apache.hudi.metadata.{HoodieBackedTableMetadata, MetadataPartitionType} import org.apache.hudi.util.JavaConversions import org.apache.spark.sql._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api._ import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, EnumSource} +import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource} import java.util.Collections import java.util.concurrent.Executors @@ -65,6 +67,18 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { saveMode = SaveMode.Append) } + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testRLIUpsertNonPartitioned(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()) + doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndRecordIndex(hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + } + @ParameterizedTest @CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", "MERGE_ON_READ,true", "MERGE_ON_READ,false")) def testRLIBulkInsertThenInsertOverwrite(tableType: HoodieTableType, enableRowWriter: Boolean): Unit = { @@ -335,12 +349,16 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { } @ParameterizedTest - @EnumSource(classOf[HoodieTableType]) - def testEnableDisableRLI(tableType: HoodieTableType): Unit = { + @MethodSource(Array("testEnableDisableRLIParams")) + def testEnableDisableRLI(tableType: HoodieTableType, isPartitioned: Boolean): Unit = { var hudiOpts = commonOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name() ) + if (!isPartitioned) { + hudiOpts = hudiOpts - PARTITIONPATH_FIELD.key + } + doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Overwrite) @@ -470,3 +488,15 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { validateDataAndRecordIndices(hudiOpts) } } + +object TestRecordLevelIndex { + + def testEnableDisableRLIParams(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + arguments(HoodieTableType.COPY_ON_WRITE, new java.lang.Boolean(false)), + arguments(HoodieTableType.COPY_ON_WRITE, new java.lang.Boolean(true)), + arguments(HoodieTableType.MERGE_ON_READ, new java.lang.Boolean(false)), + arguments(HoodieTableType.MERGE_ON_READ, new java.lang.Boolean(true)) + ) + } +}