This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new af4c5abd123  [HUDI-6927] CDC file clean not work (#9841)
af4c5abd123 is described below

commit af4c5abd123dbea476377aaa1996640fc39142ed
Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com>
AuthorDate: Thu Oct 12 19:35:00 2023 +0800

     [HUDI-6927] CDC file clean not work (#9841)
---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  4 +-
 .../hudi/functional/cdc/HoodieCDCTestBase.scala    |  7 +++
 .../functional/cdc/TestCDCDataFrameSuite.scala     | 65 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 04aa72303da..68fa84ffc8d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -392,7 +392,9 @@ public class HoodieTableMetadataUtil {
 
                         Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
                         if (cdcPathAndSizes != null && 
!cdcPathAndSizes.isEmpty()) {
-                          map.putAll(cdcPathAndSizes);
+                          cdcPathAndSizes.entrySet().forEach(cdcEntry -> {
+                            map.put(FSUtils.getFileName(cdcEntry.getKey(), 
partitionStatName), cdcEntry.getValue());
+                          });
                         }
                         return map;
                       },
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index dfca644e345..10b13478559 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertNull}
 
+import java.util.function.Predicate
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -115,6 +116,12 @@ abstract class HoodieCDCTestBase extends 
HoodieSparkClientTestBase {
     commitMetadata.getWriteStats.asScala.flatMap(_.getCdcStats.keys).toList
   }
 
+  protected def isFilesExistInFileSystem(files: List[String]): Boolean = {
+    files.stream().allMatch(new Predicate[String] {
+      override def test(file: String): Boolean = fs.exists(new Path(basePath + 
"/" + file))
+    })
+  }
+
   protected def getCDCBlocks(relativeLogFile: String, cdcSchema: Schema): 
List[HoodieDataBlock] = {
     val logFile = new HoodieLogFile(
       metaClient.getFs.getFileStatus(new Path(metaClient.getBasePathV2, 
relativeLogFile)))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
index aac836d8c3a..baf396f9232 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -688,4 +688,69 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase {
 
     assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(), 
2)
   }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieCDCSupplementalLoggingMode])
+  def testCDCCleanRetain(loggingMode: HoodieCDCSupplementalLoggingMode): Unit 
= {
+    val options = Map(
+      "hoodie.table.cdc.enabled" -> "true",
+      "hoodie.table.cdc.supplemental.logging.mode" -> loggingMode.name(),
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1",
+      "hoodie.datasource.write.recordkey.field" -> "_row_key",
+      "hoodie.datasource.write.precombine.field" -> "timestamp",
+      "hoodie.table.name" -> ("hoodie_test" + loggingMode.name()),
+      "hoodie.clean.automatic" -> "true",
+      "hoodie.cleaner.commits.retained" -> "1"
+    )
+
+    // Insert Operation
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(spark.sessionState.newHadoopConf)
+      .build()
+
+    // Upsert Operation
+    val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50)
+    val records2 = recordsToStrings(hoodieRecords2).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(options)
+      .option("hoodie.datasource.write.operation", "upsert")
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val instant2 = metaClient.reloadActiveTimeline.lastInstant().get()
+    val cdcLogFiles2 = getCDCLogFile(instant2)
+    assertTrue(isFilesExistInFileSystem(cdcLogFiles2))
+
+    // Upsert Operation
+    val hoodieRecords3 = dataGen.generateUniqueUpdates("002", 50)
+    val records3 = recordsToStrings(hoodieRecords3).toList
+    val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .options(options)
+      .option("hoodie.datasource.write.operation", "upsert")
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // Upsert Operation
+    val hoodieRecords4 = dataGen.generateUniqueUpdates("003", 50)
+    val records4 = recordsToStrings(hoodieRecords4).toList
+    val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2))
+    inputDF4.write.format("org.apache.hudi")
+      .options(options)
+      .option("hoodie.datasource.write.operation", "upsert")
+      .mode(SaveMode.Append)
+      .save(basePath)
+    assertFalse(isFilesExistInFileSystem(cdcLogFiles2))
+  }
 }

Reply via email to