[GitHub] [hudi] FredMkl opened a new issue, #6591: [SUPPORT]Duplicate records in MOR

2023-04-26 Thread via GitHub


FredMkl opened a new issue, #6591:
URL: https://github.com/apache/hudi/issues/6591

   **Describe the problem you faced**
   We use MOR table, we found that when updating an existing set of rows to 
another partition will result in both a)generate a parquet file b)an update 
written to a log file. This brings duplicate records
   
   **To Reproduce**
   ```
   //action1: spark-dataframe write
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.spark.sql.{DataFrame, Row, SparkSession}
   import scala.collection.mutable
   
   val tableName = "f_schedule_test"
   val basePath = "oss://datalake-poc/fred/warehouse/dw/f_schedule_test"
   val spark = SparkSession.builder.enableHiveSupport.getOrCreate
   
   import spark.implicits._
   // spark-shell
   
   val df = Seq(
 ("1", "10001", "2022-08-30","2022-08-30 12:00:00.000","2022-08-30"),
 ("2", "10002", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("3", "10003", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("4", "10004", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("5", "10005", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("6", "10006", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30")
   ).toDF("game_schedule_id", "game_id", "game_date_cn", "insert_date", 
"dt")
   
   // df.show()
   val hudiOptions = mutable.Map(
 "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
 "hoodie.datasource.write.operation" -> "upsert",
 "hoodie.datasource.write.recordkey.field" -> "game_schedule_id",
 "hoodie.datasource.write.precombine.field" -> "insert_date",
 "hoodie.datasource.write.partitionpath.field" -> "dt",
 "hoodie.index.type" -> "GLOBAL_BLOOM",
 "hoodie.compact.inline" -> "true",
 "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.ComplexKeyGenerator"
   )
   
   //step1: insert  --no issue
   df.write.format("hudi").
   options(hudiOptions).
   mode(Append).
   save(basePath)
   
   //step2: move part data to another partition --no issue
   val df1 = spark.sql("select * from dw.f_schedule_test where dt = 
'2022-08-30'").withColumn("dt",lit("2022-08-31")).limit(3)
   df1.write.format("hudi").
   options(hudiOptions).
   mode(Append).
   save(basePath)
   
   //step3: move back  --duplicate occurs
   //Updating an existing set of rows will result in either a) a companion 
log/delta file for an existing base parquet file generated from a previous 
compaction or b) an update written to a log/delta file in case no compaction 
ever happened for it.
   val df2 = spark.sql("select * from dw.f_schedule_test where dt = 
'2022-08-31'").withColumn("dt",lit("2022-08-30")).limit(3)
   df2.write.format("hudi").
   options(hudiOptions).
   mode(Append).
   save(basePath)
   ```
   **Checking scripts:**
   ```
   select * from dw.f_schedule_test where game_schedule_id = 1;
   
   select _hoodie_file_name,count(*) as co from dw.f_schedule_test group by 
_hoodie_file_name;
   ```
   
   **results:**
   ![截屏2022-09-05 13 36 
10](https://user-images.githubusercontent.com/110440662/188367978-138fdce1-24d8-4cfa-ae81-82467f9cde09.png)
   ![截屏2022-09-05 13 36 
19](https://user-images.githubusercontent.com/110440662/188368061-c7846de6-2490-475f-99ec-d240c5307033.png)
   
   
   **Expected behavior**
   
   Duplicate records should not occur
   
   **Environment Description**
   
   * Hudi version :0.10.1
   
   * Spark version :3.2.1
   
   * Hive version :3.1.2
   
   * Hadoop version :3.2.1
   
   * Storage (HDFS/S3/GCS..) :OSS
   
   * Running on Docker? (yes/no) :no
   [hoodie.zip](https://github.com/apache/hudi/files/9487065/hoodie.zip)
   
   
   **Stacktrace**
   
   ```Pls check logs as attached
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] FredMkl opened a new issue, #6591: [SUPPORT]Duplicate records in MOR

2022-09-04 Thread GitBox


FredMkl opened a new issue, #6591:
URL: https://github.com/apache/hudi/issues/6591

   **Describe the problem you faced**
   We use MOR table, we found that when updating an existing set of rows to 
another partition will result in both a)generate a parquet file b)an update 
written to a log file. This brings duplicate records
   
   **To Reproduce**
   ```
   //action1: spark-dataframe write
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.spark.sql.{DataFrame, Row, SparkSession}
   import scala.collection.mutable
   
   val tableName = "f_schedule_test"
   val basePath = "oss://nbadatalake-poc/fred/warehouse/dw/f_schedule_test"
   val spark = SparkSession.builder.enableHiveSupport.getOrCreate
   
   import spark.implicits._
   // spark-shell
   
   val df = Seq(
 ("1", "10001", "2022-08-30","2022-08-30 12:00:00.000","2022-08-30"),
 ("2", "10002", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("3", "10003", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("4", "10004", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("5", "10005", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30"),
 ("6", "10006", "2022-08-31","2022-08-30 12:00:00.000","2022-08-30")
   ).toDF("game_schedule_id", "game_id", "game_date_cn", "insert_date", 
"dt")
   
   // df.show()
   val hudiOptions = mutable.Map(
 "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
 "hoodie.datasource.write.operation" -> "upsert",
 "hoodie.datasource.write.recordkey.field" -> "game_schedule_id",
 "hoodie.datasource.write.precombine.field" -> "insert_date",
 "hoodie.datasource.write.partitionpath.field" -> "dt",
 "hoodie.index.type" -> "GLOBAL_BLOOM",
 "hoodie.compact.inline" -> "true",
 "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.ComplexKeyGenerator"
   )
   
   //step1: insert  --no issue
   df.write.format("hudi").
   options(hudiOptions).
   mode(Append).
   save(basePath)
   
   //step2: move part data to another partition --no issue
   val df1 = spark.sql("select * from dw.f_schedule_test where dt = 
'2022-08-30'").withColumn("dt",lit("2022-08-31")).limit(3)
   df1.write.format("hudi").
   options(hudiOptions).
   mode(Append).
   save(basePath)
   
   //step3: move back  --duplicate occurs
   //Updating an existing set of rows will result in either a) a companion 
log/delta file for an existing base parquet file generated from a previous 
compaction or b) an update written to a log/delta file in case no compaction 
ever happened for it.
   val df2 = spark.sql("select * from dw.f_schedule_test where dt = 
'2022-08-31'").withColumn("dt",lit("2022-08-30")).limit(3)
   df2.write.format("hudi").
   options(hudiOptions).
   mode(Append).
   save(basePath)
   ```
   **Checking scripts:**
   ```
   select * from dw.f_schedule_test where game_schedule_id = 1;
   
   select _hoodie_file_name,count(*) as co from dw.f_schedule_test group by 
_hoodie_file_name;
   ```
   
   **results:**
   ![截屏2022-09-05 13 36 
10](https://user-images.githubusercontent.com/110440662/188367978-138fdce1-24d8-4cfa-ae81-82467f9cde09.png)
   ![截屏2022-09-05 13 36 
19](https://user-images.githubusercontent.com/110440662/188368061-c7846de6-2490-475f-99ec-d240c5307033.png)
   
   
   **Expected behavior**
   
   Duplicate records should not occur
   
   **Environment Description**
   
   * Hudi version :0.10.1
   
   * Spark version :3.2.1
   
   * Hive version :3.1.2
   
   * Hadoop version :3.2.1
   
   * Storage (HDFS/S3/GCS..) :OSS
   
   * Running on Docker? (yes/no) :no
   [hoodie.zip](https://github.com/apache/hudi/files/9487065/hoodie.zip)
   
   
   **Stacktrace**
   
   ```Pls check logs as attached
   [hoodie.zip](https://github.com/apache/hudi/files/9487078/hoodie.zip)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org