huzekang commented on issue #2656:
URL: https://github.com/apache/hudi/issues/2656#issuecomment-820287948


   I have the same problem. 
   when i set insert operation to hudi, I expect the result has 10 records,but 
there is 8 records .
   It just like upsert opt.
   ```
       val spark = SparkSession.builder()
         .master("local[*]")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         // Uses Hive SerDe, this is mandatory for MoR tables
         .config("spark.sql.hive.convertMetastoreParquet", "false")
         .config("spark.hadoop.fs.defaultFS", "hdfs://hadoop-master:8020")
         .getOrCreate()
   
       val tableName = "hudi_archive_insert"
       val basePath = "/tmp/hudi/"+tableName
   
       val inserts = List(
         """{"id" : 1,  "name": "iteblog1", "age" : 101, "ts" : 1, "dt" : 
"20191212"}""",
         """{"id" : 1,  "name": "iteblog2", "age" : 101, "ts" : 2, "dt" : 
"20191212"}""",
         """{"id" : 1,  "name": "iteblog3", "age" : 101, "ts" : 3, "dt" : 
"20191212"}""",
         """{"id" : 3, "name": "hudi2", "age" : 103, "ts" : 1, "dt" : 
"20191212"}""",
         """{"id" : 2, "name": "iteblog_hadoop2", "age" : 102, "ts" : 3, "dt" : 
"20191213"}""",
         """{"id" : 1, "name": "flink2", "age" : 102, "ts" : 1, "dt" : 
"20191213"}"""
       )
   
       insert(spark, inserts, tableName, basePath)
   
       val inserts2 = List(
         """{"id" : 4,  "name": "Dingding", "age" : 101, "ts" : 1, "dt" : 
"20191212"}""",
         """{"id" : 5,  "name": "Kugou", "age" : 101, "ts" : 2, "dt" : 
"20191212"}""",
         """{"id" : 2, "name": "Mumu", "age" : 102, "ts" : 1, "dt" : 
"20191213"}""",
         """{"id" : 2, "name": "Mumu2", "age" : 102, "ts" : 2, "dt" : 
"20191213"}"""
       )
   
       insert(spark, inserts2, tableName, basePath)
   
       val df = spark.read.format("org.apache.hudi").load(basePath + "/*")
       df.show()
   //    
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---+--------+---+
   //    
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|age|      dt| id|    name| ts|
   //    
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---+--------+---+
   //    |     20210415173137|  20210415173137_1_1|                 1|          
    20191212|87062f4a-1de0-416...|101|20191212|  1|iteblog3|  3|
   //    |     20210415173137|  20210415173137_1_3|                 1|          
    20191212|87062f4a-1de0-416...|101|20191212|  1|iteblog3|  3|
   //    |     20210415173137|  20210415173137_1_5|                 1|          
    20191212|87062f4a-1de0-416...|101|20191212|  1|iteblog3|  3|
   //    |     20210415173137|  20210415173137_1_6|                 3|          
    20191212|87062f4a-1de0-416...|103|20191212|  3|   hudi2|  1|
   //    |     20210415173144|  20210415173144_1_8|                 4|          
    20191212|87062f4a-1de0-416...|101|20191212|  4|Dingding|  1|
   //    |     20210415173144|  20210415173144_1_9|                 5|          
    20191212|87062f4a-1de0-416...|101|20191212|  5|   Kugou|  2|
   //    |     20210415173144|  20210415173144_0_7|                 2|          
    20191213|f93a2c86-8eda-4cc...|102|20191213|  2|   Mumu2|  2|
   //    |     20210415173137|  20210415173137_0_4|                 1|          
    20191213|f93a2c86-8eda-4cc...|102|20191213|  1|  flink2|  1|
   //      
+-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---+--------+---+
   
       spark.stop()
     }
   
     def insert(spark: SparkSession, inserts: List[String], tableName: String, 
basePath: String) = {
   
       val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   
       df.write.format("org.apache.hudi")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
         .option(HoodieWriteConfig.TABLE_NAME, tableName)
         // Set this to a lower value to improve performance.
         .option("hoodie.insert.shuffle.parallelism", "2")
         .mode("append")
         .save(basePath)
   
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to