This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc95b03b5ef [HUDI-7002] Fixing initializing RLI MDT partition for 
non-partitioned dataset (#9938)
cc95b03b5ef is described below

commit cc95b03b5ef69ee585e0f781b6ad29cf5e770469
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Fri Nov 3 10:21:04 2023 -0400

    [HUDI-7002] Fixing initializing RLI MDT partition for non-partitioned 
dataset (#9938)
---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  2 +-
 .../hudi/functional/TestRecordLevelIndex.scala     | 36 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6502f8c246d..6808a0ef8dc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1738,7 +1738,7 @@ public class HoodieTableMetadataUtil {
       final String partition = partitionAndBaseFile.getKey();
       final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
       final String filename = baseFile.getFileName();
-      Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
+      Path dataFilePath = new Path(basePath, 
StringUtils.isNullOrEmpty(partition) ? filename : (partition + Path.SEPARATOR) 
+ filename);
 
       final String fileId = baseFile.getFileId();
       final String instantTime = baseFile.getCommitTime();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 17e3cadeeff..56866e7bf40 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -26,13 +26,15 @@ import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.config._
 import org.apache.hudi.exception.HoodieWriteConflictException
+import 
org.apache.hudi.functional.TestCOWDataSourceStorage.{SQL_DRIVER_IS_NOT_NULL, 
SQL_DRIVER_IS_NULL, SQL_QUERY_EQUALITY_VALIDATOR_CLASS_NAME, 
SQL_QUERY_INEQUALITY_VALIDATOR_CLASS_NAME, SQL_RIDER_IS_NOT_NULL, 
SQL_RIDER_IS_NULL}
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
MetadataPartitionType}
 import org.apache.hudi.util.JavaConversions
 import org.apache.spark.sql._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api._
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
+import org.junit.jupiter.params.provider.Arguments.arguments
+import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, 
MethodSource}
 
 import java.util.Collections
 import java.util.concurrent.Executors
@@ -65,6 +67,18 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
       saveMode = SaveMode.Append)
   }
 
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testRLIUpsertNonPartitioned(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts - PARTITIONPATH_FIELD.key + 
(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+  }
+
   @ParameterizedTest
   @CsvSource(Array("COPY_ON_WRITE,true", "COPY_ON_WRITE,false", 
"MERGE_ON_READ,true", "MERGE_ON_READ,false"))
   def testRLIBulkInsertThenInsertOverwrite(tableType: HoodieTableType, 
enableRowWriter: Boolean): Unit = {
@@ -335,12 +349,16 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
   }
 
   @ParameterizedTest
-  @EnumSource(classOf[HoodieTableType])
-  def testEnableDisableRLI(tableType: HoodieTableType): Unit = {
+  @MethodSource(Array("testEnableDisableRLIParams"))
+  def testEnableDisableRLI(tableType: HoodieTableType, isPartitioned: 
Boolean): Unit = {
     var hudiOpts = commonOpts ++ Map(
       DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name()
     )
 
+    if (!isPartitioned) {
+      hudiOpts = hudiOpts - PARTITIONPATH_FIELD.key
+    }
+
     doWriteAndValidateDataAndRecordIndex(hudiOpts,
       operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       saveMode = SaveMode.Overwrite)
@@ -470,3 +488,15 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase {
     validateDataAndRecordIndices(hudiOpts)
   }
 }
+
+object TestRecordLevelIndex {
+
+  def testEnableDisableRLIParams(): java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      arguments(HoodieTableType.COPY_ON_WRITE, new java.lang.Boolean(false)),
+      arguments(HoodieTableType.COPY_ON_WRITE, new java.lang.Boolean(true)),
+      arguments(HoodieTableType.MERGE_ON_READ, new java.lang.Boolean(false)),
+      arguments(HoodieTableType.MERGE_ON_READ, new java.lang.Boolean(true))
+    )
+  }
+}

Reply via email to