xushiyan commented on issue #5692:
URL: https://github.com/apache/hudi/issues/5692#issuecomment-1146777209

   @alexey-chumakov thanks for the scripts; helped a lot in verifying the 
issue. I ran your code with Hudi 0.11.0 and spark 3.1.2 / 3.2.1 and it works as 
expected. So this is most likely fixed after 0.9.0. 
   
   ```shell
   ./bin/spark-shell \
   --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.0 \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   ```
   
   <details>
   <summary>scripts</summary>
   
   ```scala
   import org.apache.hadoop.conf.Configuration
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.common.table.HoodieTableMetaClient
   import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.spark.SparkConf
   import org.apache.spark.sql._
   
   import java.util.UUID
   
   val tempPath: String = 
s"${System.getProperty("user.dir")}/hudi-test/${UUID.randomUUID().toString}"
   
   def loadTimeline(): HoodieActiveTimeline = {
       HoodieTableMetaClient
         .builder()
         .setConf(new Configuration())
         .setBasePath(tempPath)
         .build()
         .getActiveTimeline
   }
   
   def createDataFrame(data: Seq[String]): DataFrame = {
       import spark.implicits._
       val dataFrame = spark.read.json(data.toDS)
       dataFrame
   }
   
   def writeToHudi(data: Seq[String]): Unit = {
       createDataFrame(data).write
         .format("hudi")
         .option(TABLE_TYPE.key(), "MERGE_ON_READ")
         .option(HoodieWriteConfig.TBL_NAME.key(), "test")
         .option(RECORDKEY_FIELD.key(), "id")
         .option(PRECOMBINE_FIELD.key(), "version")
         .option(PARTITIONPATH_FIELD.key(), "partition")
         .option("hoodie.datasource.write.hive_style_partitioning", "true")
         .option("hoodie.finalize.write.parallelism", "2")
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .mode(SaveMode.Append)
         .save(tempPath)
   }
   
     // Do an initial commit to Hudi
     writeToHudi(
       Seq(
         """{"id": 1, "version": "1", "partition": "partition1", "value":{"x": 
0}}""",
         """{"id": 2, "version": "1", "partition": "partition1", "value":{"x": 
0}}"""
       )
     )
   
     // Add one more commit to another base file
     writeToHudi(
       Seq(
         """{"id": 3, "version": "1", "partition": "partition1", "value":{"x": 
1}}""",
         """{"id": 4, "version": "1", "partition": "partition1", "value":{"x": 
1}}"""
       )
     )
   
     // Get first commit
     val firstCommit: String = 
loadTimeline().filterCompletedInstants().firstInstant().get().getTimestamp
   
     val df: DataFrame = spark.read.format("hudi").
       option(TIME_TRAVEL_AS_OF_INSTANT.key(), firstCommit).load(s"$tempPath/*")
   
     df.show(false)
   ```
   </details>
   
   It showed only records with id 1 and 2 (first commit)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to