parisni opened a new issue, #6531:
URL: https://github.com/apache/hudi/issues/6531

   hudi 0.11 / 0.12 at least
   
   
   >  Bulk insert provides the same semantics as insert
   https://hudi.apache.org/docs/write_operations#bulk_insert
   
   From my tests this is not true. The below script results in duplicated rows 
with `bulk insert`, while `insert` replaces rows (what we expect from upsert)
   
   I would expect both bulk_insert / insert share same results !
   ```
   sc.setLogLevel("WARN")
   from pyspark.sql.types import StructType, StructField, StringType
   
   env = "qa"
   datalaketable = "nested_table"
   tableName = "test_hudi_{datalaketable}".format(datalaketable=datalaketable)
   basePath = "/tmp/{tableName}".format(tableName=tableName)
   from pyspark.sql.functions import expr
   
   ##
   # Create a nested table
   ##
   data = [ (("James", None, "Smith"), "OH", "M"),
       (("Anna", "Rose", ""), "NY", "F"),
       (("Julia", "", "Williams"), "OH", "F"),
       (("Maria", "Anne", "Jones"), "NY", "M"),
       (("Jen", "Mary", "Brown"), "NY", "M"),
       (("Mike", "Mary", "Williams"), "OH", "M"),
   ]
   
   schema = StructType(
       [
           StructField(
               "name",
               StructType(
                   [
                       StructField("firstname", StringType(), True),
                       StructField("middlename", StringType(), True),
                       StructField("lastname", StringType(), True),
                   ]
               ),
           ),
           StructField("state", StringType(), True),
           StructField("gender", StringType(), True),
       ]
   )
   df = (
       spark.createDataFrame(data=data, schema=schema)
       .withColumn("event_id", expr("row_number() over(partition by 1 order by 
1)"))
       .withColumn("event_date", expr("current_date()"))
       .withColumn("version", expr("current_date()"))
   )
   
   df.printSchema()
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "event_id",
       "hoodie.datasource.write.partitionpath.field": "event_date",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "insert", # change with bulk_insert
       "hoodie.datasource.write.precombine.field": "version",
       "hoodie.upsert.shuffle.parallelism": 80,
       "hoodie.insert.shuffle.parallelism": 80,
       "hoodie.delete.shuffle.parallelism": 80,
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.hive_sync.enable": "false",
       "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.metadata.enable": "true",
   }
   
(df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.read.format("hudi").load(basePath).count()
   
   
(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath))
   
   spark.read.format("hudi").load(basePath).count()
   
   // 12 with bulk insert
   // 6 with insert
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to