huzekang commented on issue #2656: URL: https://github.com/apache/hudi/issues/2656#issuecomment-820287948
I have the same problem. when i set insert operation to hudi, I expect the result has 10 records,but there is 8 records . It just like upsert opt. ``` val spark = SparkSession.builder() .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Uses Hive SerDe, this is mandatory for MoR tables .config("spark.sql.hive.convertMetastoreParquet", "false") .config("spark.hadoop.fs.defaultFS", "hdfs://hadoop-master:8020") .getOrCreate() val tableName = "hudi_archive_insert" val basePath = "/tmp/hudi/"+tableName val inserts = List( """{"id" : 1, "name": "iteblog1", "age" : 101, "ts" : 1, "dt" : "20191212"}""", """{"id" : 1, "name": "iteblog2", "age" : 101, "ts" : 2, "dt" : "20191212"}""", """{"id" : 1, "name": "iteblog3", "age" : 101, "ts" : 3, "dt" : "20191212"}""", """{"id" : 3, "name": "hudi2", "age" : 103, "ts" : 1, "dt" : "20191212"}""", """{"id" : 2, "name": "iteblog_hadoop2", "age" : 102, "ts" : 3, "dt" : "20191213"}""", """{"id" : 1, "name": "flink2", "age" : 102, "ts" : 1, "dt" : "20191213"}""" ) insert(spark, inserts, tableName, basePath) val inserts2 = List( """{"id" : 4, "name": "Dingding", "age" : 101, "ts" : 1, "dt" : "20191212"}""", """{"id" : 5, "name": "Kugou", "age" : 101, "ts" : 2, "dt" : "20191212"}""", """{"id" : 2, "name": "Mumu", "age" : 102, "ts" : 1, "dt" : "20191213"}""", """{"id" : 2, "name": "Mumu2", "age" : 102, "ts" : 2, "dt" : "20191213"}""" ) insert(spark, inserts2, tableName, basePath) val df = spark.read.format("org.apache.hudi").load(basePath + "/*") df.show() // +-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---+--------+---+ // |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|age| dt| id| name| ts| // +-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---+--------+---+ // | 20210415173137| 20210415173137_1_1| 1| 20191212|87062f4a-1de0-416...|101|20191212| 1|iteblog3| 3| // | 20210415173137| 20210415173137_1_3| 1| 20191212|87062f4a-1de0-416...|101|20191212| 1|iteblog3| 3| // | 20210415173137| 20210415173137_1_5| 1| 20191212|87062f4a-1de0-416...|101|20191212| 1|iteblog3| 3| // | 20210415173137| 20210415173137_1_6| 3| 20191212|87062f4a-1de0-416...|103|20191212| 3| hudi2| 1| // | 20210415173144| 20210415173144_1_8| 4| 20191212|87062f4a-1de0-416...|101|20191212| 4|Dingding| 1| // | 20210415173144| 20210415173144_1_9| 5| 20191212|87062f4a-1de0-416...|101|20191212| 5| Kugou| 2| // | 20210415173144| 20210415173144_0_7| 2| 20191213|f93a2c86-8eda-4cc...|102|20191213| 2| Mumu2| 2| // | 20210415173137| 20210415173137_0_4| 1| 20191213|f93a2c86-8eda-4cc...|102|20191213| 1| flink2| 1| // +-------------------+--------------------+------------------+----------------------+--------------------+---+--------+---+--------+---+ spark.stop() } def insert(spark: SparkSession, inserts: List[String], tableName: String, basePath: String) = { val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(HoodieWriteConfig.TABLE_NAME, tableName) // Set this to a lower value to improve performance. .option("hoodie.insert.shuffle.parallelism", "2") .mode("append") .save(basePath) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org