parisni opened a new issue, #6531: URL: https://github.com/apache/hudi/issues/6531
hudi 0.11 / 0.12 at least > Bulk insert provides the same semantics as insert https://hudi.apache.org/docs/write_operations#bulk_insert From my tests this is not true. The below script results in duplicated rows with `bulk insert`, while `insert` replaces rows (what we expect from upsert) I would expect both bulk_insert / insert share same results ! ``` sc.setLogLevel("WARN") from pyspark.sql.types import StructType, StructField, StringType env = "qa" datalaketable = "nested_table" tableName = "test_hudi_{datalaketable}".format(datalaketable=datalaketable) basePath = "/tmp/{tableName}".format(tableName=tableName) from pyspark.sql.functions import expr ## # Create a nested table ## data = [ (("James", None, "Smith"), "OH", "M"), (("Anna", "Rose", ""), "NY", "F"), (("Julia", "", "Williams"), "OH", "F"), (("Maria", "Anne", "Jones"), "NY", "M"), (("Jen", "Mary", "Brown"), "NY", "M"), (("Mike", "Mary", "Williams"), "OH", "M"), ] schema = StructType( [ StructField( "name", StructType( [ StructField("firstname", StringType(), True), StructField("middlename", StringType(), True), StructField("lastname", StringType(), True), ] ), ), StructField("state", StringType(), True), StructField("gender", StringType(), True), ] ) df = ( spark.createDataFrame(data=data, schema=schema) .withColumn("event_id", expr("row_number() over(partition by 1 order by 1)")) .withColumn("event_date", expr("current_date()")) .withColumn("version", expr("current_date()")) ) df.printSchema() hudi_options = { "hoodie.table.name": tableName, "hoodie.datasource.write.recordkey.field": "event_id", "hoodie.datasource.write.partitionpath.field": "event_date", "hoodie.datasource.write.table.name": tableName, "hoodie.datasource.write.operation": "insert", # change with bulk_insert "hoodie.datasource.write.precombine.field": "version", "hoodie.upsert.shuffle.parallelism": 80, "hoodie.insert.shuffle.parallelism": 80, "hoodie.delete.shuffle.parallelism": 80, "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "false", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.metadata.enable": "true", } (df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)) spark.read.format("hudi").load(basePath).count() (df.write.format("hudi").options(**hudi_options).mode("append").save(basePath)) spark.read.format("hudi").load(basePath).count() // 12 with bulk insert // 6 with insert ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org