yihua commented on a change in pull request #4268:
URL: https://github.com/apache/hudi/pull/4268#discussion_r770127985
##
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
##
@@ -170,4 +175,73 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.load(basePath)
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be
pulled
}
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("insert_overwrite", "delete_partition"))
+ def testArchivalWithReplaceCommitActions(writeOperation: String): Unit = {
+
+val dataGen = new HoodieTestDataGenerator()
+// use this to generate records only for certain partitions.
+val dataGenPartition1 = new
HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH))
+val dataGenPartition2 = new
HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
+
+// do one bulk insert to all partitions
+val records = recordsToStrings(dataGen.generateInserts("%05d".format(1),
100)).toList
+val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+val partition1RecordCount = inputDF.filter(row =>
row.getAs("partition_path")
+ .equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).count()
+inputDF.write.format("hudi")
+ .options(commonOpts)
+ .option("hoodie.keep.min.commits", "2")
+ .option("hoodie.keep.max.commits", "3")
+ .option("hoodie.cleaner.commits.retained", "1")
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+assertRecordCount(basePath, 100)
+
+// issue delete partition to partition1
+writeRecords(2, dataGenPartition1, writeOperation, basePath)
+
+val expectedRecCount = if
(writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL))
+{
+ 200 - partition1RecordCount
+} else {
+ 100 - partition1RecordCount
+}
+assertRecordCount(basePath, expectedRecCount)
+
+// add more data to partition2.
+for (i <- 3 to 7) {
+ writeRecords(i, dataGenPartition2,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, basePath)
+}
+
+assertRecordCount(basePath, expectedRecCount + 500)
+val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true).build()
+val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+ .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
+// assert replace commit is archived and not part of active timeline.
+assertFalse(commits.contains(HoodieTimeline.REPLACE_COMMIT_ACTION))
Review comment:
Add another check to make sure the replace commit is in the archived
timeline?
##
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
Review comment:
Shall we avoid these new imports which are not used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org