[GitHub] [hudi] FredMkl opened a new issue, #6591: [SUPPORT]Duplicate records in MOR
FredMkl opened a new issue, #6591: URL: https://github.com/apache/hudi/issues/6591 **Describe the problem you faced** We use MOR table, we found that when updating an existing set of rows to another partition will result in both a)generate a parquet file b)an update written to a log file. This brings duplicate records **To Reproduce** ``` //action1: spark-dataframe write import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.mutable val tableName = "f_schedule_test" val basePath = "oss://datalake-poc/fred/warehouse/dw/f_schedule_test" val spark = SparkSession.builder.enableHiveSupport.getOrCreate import spark.implicits._ // spark-shell val df = Seq( ("1", "10001", "2022-08-30","2022-08-30 12:00:00.000","2022-08-30"), ("2", "10002", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("3", "10003", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("4", "10004", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("5", "10005", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("6", "10006", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30") ).toDF("game_schedule_id", "game_id", "game_date_cn", "insert_date", "dt") // df.show() val hudiOptions = mutable.Map( "hoodie.datasource.write.table.type" -> "MERGE_ON_READ", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "game_schedule_id", "hoodie.datasource.write.precombine.field" -> "insert_date", "hoodie.datasource.write.partitionpath.field" -> "dt", "hoodie.index.type" -> "GLOBAL_BLOOM", "hoodie.compact.inline" -> "true", "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator" ) //step1: insert --no issue df.write.format("hudi"). options(hudiOptions). mode(Append). save(basePath) //step2: move part data to another partition --no issue val df1 = spark.sql("select * from dw.f_schedule_test where dt = '2022-08-30'").withColumn("dt",lit("2022-08-31")).limit(3) df1.write.format("hudi"). options(hudiOptions). mode(Append). save(basePath) //step3: move back --duplicate occurs //Updating an existing set of rows will result in either a) a companion log/delta file for an existing base parquet file generated from a previous compaction or b) an update written to a log/delta file in case no compaction ever happened for it. val df2 = spark.sql("select * from dw.f_schedule_test where dt = '2022-08-31'").withColumn("dt",lit("2022-08-30")).limit(3) df2.write.format("hudi"). options(hudiOptions). mode(Append). save(basePath) ``` **Checking scripts:** ``` select * from dw.f_schedule_test where game_schedule_id = 1; select _hoodie_file_name,count(*) as co from dw.f_schedule_test group by _hoodie_file_name; ``` **results:** ![截屏2022-09-05 13 36 10](https://user-images.githubusercontent.com/110440662/188367978-138fdce1-24d8-4cfa-ae81-82467f9cde09.png) ![截屏2022-09-05 13 36 19](https://user-images.githubusercontent.com/110440662/188368061-c7846de6-2490-475f-99ec-d240c5307033.png) **Expected behavior** Duplicate records should not occur **Environment Description** * Hudi version :0.10.1 * Spark version :3.2.1 * Hive version :3.1.2 * Hadoop version :3.2.1 * Storage (HDFS/S3/GCS..) :OSS * Running on Docker? (yes/no) :no [hoodie.zip](https://github.com/apache/hudi/files/9487065/hoodie.zip) **Stacktrace** ```Pls check logs as attached ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] FredMkl opened a new issue, #6591: [SUPPORT]Duplicate records in MOR
FredMkl opened a new issue, #6591: URL: https://github.com/apache/hudi/issues/6591 **Describe the problem you faced** We use MOR table, we found that when updating an existing set of rows to another partition will result in both a)generate a parquet file b)an update written to a log file. This brings duplicate records **To Reproduce** ``` //action1: spark-dataframe write import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.mutable val tableName = "f_schedule_test" val basePath = "oss://nbadatalake-poc/fred/warehouse/dw/f_schedule_test" val spark = SparkSession.builder.enableHiveSupport.getOrCreate import spark.implicits._ // spark-shell val df = Seq( ("1", "10001", "2022-08-30","2022-08-30 12:00:00.000","2022-08-30"), ("2", "10002", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("3", "10003", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("4", "10004", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("5", "10005", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"), ("6", "10006", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30") ).toDF("game_schedule_id", "game_id", "game_date_cn", "insert_date", "dt") // df.show() val hudiOptions = mutable.Map( "hoodie.datasource.write.table.type" -> "MERGE_ON_READ", "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "game_schedule_id", "hoodie.datasource.write.precombine.field" -> "insert_date", "hoodie.datasource.write.partitionpath.field" -> "dt", "hoodie.index.type" -> "GLOBAL_BLOOM", "hoodie.compact.inline" -> "true", "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator" ) //step1: insert --no issue df.write.format("hudi"). options(hudiOptions). mode(Append). save(basePath) //step2: move part data to another partition --no issue val df1 = spark.sql("select * from dw.f_schedule_test where dt = '2022-08-30'").withColumn("dt",lit("2022-08-31")).limit(3) df1.write.format("hudi"). options(hudiOptions). mode(Append). save(basePath) //step3: move back --duplicate occurs //Updating an existing set of rows will result in either a) a companion log/delta file for an existing base parquet file generated from a previous compaction or b) an update written to a log/delta file in case no compaction ever happened for it. val df2 = spark.sql("select * from dw.f_schedule_test where dt = '2022-08-31'").withColumn("dt",lit("2022-08-30")).limit(3) df2.write.format("hudi"). options(hudiOptions). mode(Append). save(basePath) ``` **Checking scripts:** ``` select * from dw.f_schedule_test where game_schedule_id = 1; select _hoodie_file_name,count(*) as co from dw.f_schedule_test group by _hoodie_file_name; ``` **results:** ![截屏2022-09-05 13 36 10](https://user-images.githubusercontent.com/110440662/188367978-138fdce1-24d8-4cfa-ae81-82467f9cde09.png) ![截屏2022-09-05 13 36 19](https://user-images.githubusercontent.com/110440662/188368061-c7846de6-2490-475f-99ec-d240c5307033.png) **Expected behavior** Duplicate records should not occur **Environment Description** * Hudi version :0.10.1 * Spark version :3.2.1 * Hive version :3.1.2 * Hadoop version :3.2.1 * Storage (HDFS/S3/GCS..) :OSS * Running on Docker? (yes/no) :no [hoodie.zip](https://github.com/apache/hudi/files/9487065/hoodie.zip) **Stacktrace** ```Pls check logs as attached [hoodie.zip](https://github.com/apache/hudi/files/9487078/hoodie.zip) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org