xushiyan commented on issue #5692: URL: https://github.com/apache/hudi/issues/5692#issuecomment-1146777209
@alexey-chumakov thanks for the scripts; helped a lot in verifying the issue. I ran your code with Hudi 0.11.0 and spark 3.1.2 / 3.2.1 and it works as expected. So this is most likely fixed after 0.9.0. ```shell ./bin/spark-shell \ --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.0 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ ``` <details> <summary>scripts</summary> ```scala import org.apache.hadoop.conf.Configuration import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.config.HoodieWriteConfig import org.apache.spark.SparkConf import org.apache.spark.sql._ import java.util.UUID val tempPath: String = s"${System.getProperty("user.dir")}/hudi-test/${UUID.randomUUID().toString}" def loadTimeline(): HoodieActiveTimeline = { HoodieTableMetaClient .builder() .setConf(new Configuration()) .setBasePath(tempPath) .build() .getActiveTimeline } def createDataFrame(data: Seq[String]): DataFrame = { import spark.implicits._ val dataFrame = spark.read.json(data.toDS) dataFrame } def writeToHudi(data: Seq[String]): Unit = { createDataFrame(data).write .format("hudi") .option(TABLE_TYPE.key(), "MERGE_ON_READ") .option(HoodieWriteConfig.TBL_NAME.key(), "test") .option(RECORDKEY_FIELD.key(), "id") .option(PRECOMBINE_FIELD.key(), "version") .option(PARTITIONPATH_FIELD.key(), "partition") .option("hoodie.datasource.write.hive_style_partitioning", "true") .option("hoodie.finalize.write.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .mode(SaveMode.Append) .save(tempPath) } // Do an initial commit to Hudi writeToHudi( Seq( """{"id": 1, "version": "1", "partition": "partition1", "value":{"x": 0}}""", """{"id": 2, "version": "1", "partition": "partition1", "value":{"x": 0}}""" ) ) // Add one more commit to another base file writeToHudi( Seq( """{"id": 3, "version": "1", "partition": "partition1", "value":{"x": 1}}""", """{"id": 4, "version": "1", "partition": "partition1", "value":{"x": 1}}""" ) ) // Get first commit val firstCommit: String = loadTimeline().filterCompletedInstants().firstInstant().get().getTimestamp val df: DataFrame = spark.read.format("hudi"). option(TIME_TRAVEL_AS_OF_INSTANT.key(), firstCommit).load(s"$tempPath/*") df.show(false) ``` </details> It showed only records with id 1 and 2 (first commit) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org