This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new af4c5abd123 [HUDI-6927] CDC file clean not work (#9841) af4c5abd123 is described below commit af4c5abd123dbea476377aaa1996640fc39142ed Author: zhuanshenbsj1 <34104400+zhuanshenb...@users.noreply.github.com> AuthorDate: Thu Oct 12 19:35:00 2023 +0800 [HUDI-6927] CDC file clean not work (#9841) --- .../hudi/metadata/HoodieTableMetadataUtil.java | 4 +- .../hudi/functional/cdc/HoodieCDCTestBase.scala | 7 +++ .../functional/cdc/TestCDCDataFrameSuite.scala | 65 ++++++++++++++++++++++ 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 04aa72303da..68fa84ffc8d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -392,7 +392,9 @@ public class HoodieTableMetadataUtil { Map<String, Long> cdcPathAndSizes = stat.getCdcStats(); if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) { - map.putAll(cdcPathAndSizes); + cdcPathAndSizes.entrySet().forEach(cdcEntry -> { + map.put(FSUtils.getFileName(cdcEntry.getKey(), partitionStatName), cdcEntry.getValue()); + }); } return map; }, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala index dfca644e345..10b13478559 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNull} +import java.util.function.Predicate import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -115,6 +116,12 @@ abstract class HoodieCDCTestBase extends HoodieSparkClientTestBase { commitMetadata.getWriteStats.asScala.flatMap(_.getCdcStats.keys).toList } + protected def isFilesExistInFileSystem(files: List[String]): Boolean = { + files.stream().allMatch(new Predicate[String] { + override def test(file: String): Boolean = fs.exists(new Path(basePath + "/" + file)) + }) + } + protected def getCDCBlocks(relativeLogFile: String, cdcSchema: Schema): List[HoodieDataBlock] = { val logFile = new HoodieLogFile( metaClient.getFs.getFileStatus(new Path(metaClient.getBasePathV2, relativeLogFile))) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index aac836d8c3a..baf396f9232 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -688,4 +688,69 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { assertEquals(spark.read.format("org.apache.hudi").load(basePath).count(), 2) } + + @ParameterizedTest + @EnumSource(classOf[HoodieCDCSupplementalLoggingMode]) + def testCDCCleanRetain(loggingMode: HoodieCDCSupplementalLoggingMode): Unit = { + val options = Map( + "hoodie.table.cdc.enabled" -> "true", + "hoodie.table.cdc.supplemental.logging.mode" -> loggingMode.name(), + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + "hoodie.datasource.write.recordkey.field" -> "_row_key", + "hoodie.datasource.write.precombine.field" -> "timestamp", + "hoodie.table.name" -> ("hoodie_test" + loggingMode.name()), + "hoodie.clean.automatic" -> "true", + "hoodie.cleaner.commits.retained" -> "1" + ) + + // Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + + // Upsert Operation + val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50) + val records2 = recordsToStrings(hoodieRecords2).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(options) + .option("hoodie.datasource.write.operation", "upsert") + .mode(SaveMode.Append) + .save(basePath) + val instant2 = metaClient.reloadActiveTimeline.lastInstant().get() + val cdcLogFiles2 = getCDCLogFile(instant2) + assertTrue(isFilesExistInFileSystem(cdcLogFiles2)) + + // Upsert Operation + val hoodieRecords3 = dataGen.generateUniqueUpdates("002", 50) + val records3 = recordsToStrings(hoodieRecords3).toList + val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(options) + .option("hoodie.datasource.write.operation", "upsert") + .mode(SaveMode.Append) + .save(basePath) + + // Upsert Operation + val hoodieRecords4 = dataGen.generateUniqueUpdates("003", 50) + val records4 = recordsToStrings(hoodieRecords4).toList + val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2)) + inputDF4.write.format("org.apache.hudi") + .options(options) + .option("hoodie.datasource.write.operation", "upsert") + .mode(SaveMode.Append) + .save(basePath) + assertFalse(isFilesExistInFileSystem(cdcLogFiles2)) + } }