pranotishanbhag opened a new issue #3008:
URL: https://github.com/apache/hudi/issues/3008


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? 
   YES
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   Part of the Slack groups. Did not find resolution there.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   I am not sure this is a bug but after the analysis we can check.
   
   **Describe the problem you faced**
   
   I am working on unit tests for some of the operations we would perform. I 
see my tests failing for the below 2 scenarios:
   1. Hive Table is not updated when DELETE operation is called on the dataset.
   2. Hive Table is not updated (getting empty table) when no partitions are 
specified.
   
   ### Details on Issue 1:
   I am trying to sync a hive table on upsert (works fine) and on delete (does 
not work) in my unit tests. As per the doc 
[Hudi_Writing-Data](https://hudi.apache.org/docs/writing_data.html#key-generation),
 we need to use GlobalDeleteKeyGenerator class for delete: 
   
   > classOf[GlobalDeleteKeyGenerator].getCanonicalName (to be used when 
OPERATION_OPT_KEY is set to DELETE_OPERATION_OPT_VAL)
   
   However, when I use this class I get the error: 
   
   > “Cause: java.lang.InstantiationException: 
org.apache.hudi.keygen.GlobalDeleteKeyGenerator” 
   
   These are the hudi properties set on save:
   ```
   (hoodie.datasource.hive_sync.database,default)
   (hoodie.combine.before.insert,true)
   (hoodie.insert.shuffle.parallelism,2)
   (hoodie.datasource.write.precombine.field,timestamp)
   (hoodie.datasource.hive_sync.partition_fields,partition)
   (hoodie.datasource.hive_sync.use_jdbc,false)
   
(hoodie.datasource.hive_sync.partition_extractor_class,org.apache.hudi.keygen.GlobalDeleteKeyGenerator)
   (hoodie.delete.shuffle.parallelism,2)
   (hoodie.datasource.hive_sync.table,TestHudiTable)
   (hoodie.index.type,GLOBAL_BLOOM)
   (hoodie.datasource.write.operation,DELETE)
   (hoodie.datasource.hive_sync.enable,true)
   (hoodie.datasource.write.recordkey.field,id)
   (hoodie.table.name,TestHudiTable)
   (hoodie.datasource.write.table.type,COPY_ON_WRITE)
   (hoodie.datasource.write.hive_style_partitioning,true)
   (hoodie.upsert.shuffle.parallelism,2)
   (hoodie.cleaner.commits.retained,15)
   (hoodie.datasource.write.partitionpath.field,partition)
   (hoodie.datasource.hive_sync.database,default)
   (hoodie.combine.before.insert,true)
   (hoodie.embed.timeline.server,false)
   (hoodie.insert.shuffle.parallelism,2)
   (hoodie.datasource.write.precombine.field,timestamp)
   (hoodie.datasource.hive_sync.partition_fields,partition)
   (hoodie.datasource.hive_sync.use_jdbc,false)
   
(hoodie.datasource.hive_sync.partition_extractor_class,org.apache.hudi.keygen.GlobalDeleteKeyGenerator)
   (hoodie.delete.shuffle.parallelism,2)
   (hoodie.datasource.hive_sync.table,TestHudiTable)
   (hoodie.index.type,GLOBAL_BLOOM)
   (hoodie.datasource.write.operation,DELETE)
   (hoodie.datasource.hive_sync.enable,true)
   (hoodie.datasource.write.recordkey.field,id)
   (hoodie.table.name,TestHudiTable)
   (hoodie.datasource.write.table.type,COPY_ON_WRITE)
   (hoodie.datasource.write.hive_style_partitioning,true)
   (hoodie.upsert.shuffle.parallelism,2)
   (hoodie.cleaner.commits.retained,15)
   (hoodie.datasource.write.partitionpath.field,partition)
   ```
   if I switch to MultiPartKeysValueExtractor class, the deletes are not 
propagated to hive table. The hudi read - 
spark.read.format(“hudi”).load(BasePath) has the right data where the id is 
deleted but spark.table(“TableName”) is not consistent and still has the id 
which was supposed to be deleted. For example, id1 is deleted in Hudi but still 
in Hive table:
   ```
   spark.read.format("hudi").load(<BasePath>)
   
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+---------+---------+-----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_nam
                                                   |id |timestamp|partition|dat|
   
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+---------+---------+-----+
   |20210528125210     |20210528125210_0_3  |id2               |partition=p1    
      
|9b3aae86-e4ed-4bfa-b6cd-ee41db11ac15-0_0-23-26_20210528125210.parquet|id2|1    
    |p1       |data2|
   |20210528125210     |20210528125210_1_1  |id3               |partition=p2    
      
|a53dba79-90a2-4370-8637-e39301b4c10c-0_1-23-27_20210528125210.parquet|id3|3    
    |p2       |data3|
   
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+---------+---------+-----+
   
   spark.table(<TableName>).show(false) //id1 supposed to be deleted, class 
used is MultiPartKeysValueExtractor, getting error with 
GlobalDeleteKeyGenerator.
   +---+---------+-----+---------+
   |id |timestamp|dat|partition|
   +---+---------+-----+---------+
   |id1|1        |data1|p1       |
   |id2|1        |data2|p1       |
   |id3|3        |data3|p2       |
   +---+---------+-----+---------+
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   ```
     def getSnapshotData: Dataset[TestData] = {
       Seq(
         TestData(id = "id1", timestamp = 1, partition = "p1", data = "data1"),
         TestData(id = "id2", timestamp = 1, partition = "p1", data = "data2"),
         TestData(id = "id3", timestamp = 3, partition = "p2", data = "data3")
       ).toDS
     }
   
   val hudiOptions = Map("hoodie.datasource.hive_sync.database" -> "default", 
"hoodie.combine.before.insert" -> "true", "hoodie.embed.timeline.server" -> 
"false", "hoodie.insert.shuffle.parallelism" -> "2", 
"hoodie.datasource.write.precombine.field" -> "timestamp", 
"hoodie.datasource.hive_sync.partition_fields" -> "partition", 
"hoodie.datasource.hive_sync.use_jdbc" -> "false", 
"hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.hive.MultiPartKeysValueExtractor", 
"hoodie.datasource.hive_sync.table" -> "TestHudiTable", "hoodie.index.type" -> 
"GLOBAL_BLOOM", "hoodie.datasource.write.operation" -> "UPSERT", 
"hoodie.datasource.hive_sync.enable" -> "true", 
"hoodie.datasource.write.recordkey.field" -> "id", "hoodie.table.name" -> 
"TestHudiTable", "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE", 
"hoodie.datasource.write.hive_style_partitioning" -> "true", 
"hoodie.upsert.shuffle.parallelism" -> "2", "hoodie.cleaner.commits.retained" 
-> "15", "hoodie.datasource.
 write.partitionpath.field" -> "partition")
   
   
snapshot.write.mode("overwrite").options(hudiOptions).format("hudi").save(<BasePath>)
   
   val deleteDF = snapshot.filter("id = 'id1'")
   
   val deleteHudiOptions = Map("hoodie.datasource.hive_sync.database" -> 
"default", "hoodie.combine.before.insert" -> "true", 
"hoodie.insert.shuffle.parallelism" -> "2", 
"hoodie.datasource.write.precombine.field" -> "timestamp", 
"hoodie.datasource.hive_sync.partition_fields" -> "partition", 
"hoodie.datasource.hive_sync.use_jdbc" -> "false", 
"hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.keygen.GlobalDeleteKeyGenerator", 
"hoodie.delete.shuffle.parallelism" -> "2", "hoodie.datasource.hive_sync.table" 
-> "TestHudiTable", "hoodie.index.type" -> "GLOBAL_BLOOM", 
"hoodie.datasource.write.operation" -> "DELETE", 
"hoodie.datasource.hive_sync.enable" -> "true", 
"hoodie.datasource.write.recordkey.field" -> "id", "hoodie.table.name" -> 
"TestHudiTable", "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE", 
"hoodie.datasource.write.hive_style_partitioning" -> "true", 
"hoodie.upsert.shuffle.parallelism" -> "2", "hoodie.cleaner.commits.retained" 
-> "15", "hoodie.datas
 ource.write.partitionpath.field" -> "partition", 
"hoodie.datasource.hive_sync.database" -> "default", 
"hoodie.combine.before.insert" -> "true", "hoodie.embed.timeline.server" -> 
"false", "hoodie.insert.shuffle.parallelism" -> "2", 
"hoodie.datasource.write.precombine.field" -> "timestamp", 
"hoodie.datasource.hive_sync.partition_fields" -> "partition", 
"hoodie.datasource.hive_sync.use_jdbc" -> "false", 
"hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.keygen.GlobalDeleteKeyGenerator", 
"hoodie.delete.shuffle.parallelism" -> "2", "hoodie.datasource.hive_sync.table" 
-> "TestHudiTable", "hoodie.index.type" -> "GLOBAL_BLOOM", 
"hoodie.datasource.write.operation" -> "DELETE", 
"hoodie.datasource.hive_sync.enable" -> "true", 
"hoodie.datasource.write.recordkey.field" -> "id", "hoodie.table.name" -> 
"TestHudiTable", "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE", 
"hoodie.datasource.write.hive_style_partitioning" -> "true", 
"hoodie.upsert.shuffle.parallelism" ->
  "2", "hoodie.cleaner.commits.retained" -> "15", 
"hoodie.datasource.write.partitionpath.field" -> "partition")
   
   
deleteDF.write.mode("append").options(deleteHudiOptions).format("hudi").save(<BasePath>)
   ```
   **Expected behavior**
   
   Expecting to delete records and sync the information in hive.
   
   **Environment Description**
   
   Hudi version : 0.8
   
   Spark version : 2.4
   
   Hive version : 2.5
   
   Hadoop version : 2.5
   
   Storage (HDFS/S3/GCS..) : Local testing. Production datasets in S3.
   
   Running on Docker? (yes/no) : no
   
   **Stacktrace**
   
   ```
    - should hard delete records from hudi table with hive sync *** FAILED *** 
(24 seconds, 49 milliseconds)
   Cause: java.lang.NoSuchMethodException: 
org.apache.hudi.keygen.GlobalDeleteKeyGenerator.<init>()
   [scalatest]   at java.lang.Class.getConstructor0(Class.java:3110)
   [scalatest]   at java.lang.Class.newInstance(Class.java:412)
   [scalatest]   at 
org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:98)
   [scalatest]   at 
org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:69)
   [scalatest]   at 
org.apache.hudi.HoodieSparkSqlWriter$.org$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:391)
   [scalatest]   at 
org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:440)
   [scalatest]   at 
org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:436)
   [scalatest]   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
   [scalatest]   at 
org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:436)
   [scalatest]   at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:497)
   [scalatest]   at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
   [scalatest]   at 
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
   [scalatest]   at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   [scalatest]   at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   [scalatest]   at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   [scalatest]   at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
   [scalatest]   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
   [scalatest]   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
   [scalatest]   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
   [scalatest]   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   [scalatest]   at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
   [scalatest]   at 
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
   [scalatest]   at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
   [scalatest]   at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
   [scalatest]   at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
   [scalatest]   at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
   [scalatest]   at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
   [scalatest]   at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
   [scalatest]   at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
   [scalatest]   at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
   [scalatest]   at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
   [scalatest]   at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
   [scalatest]   at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
   [scalatest]   at 
com.amazon.sm.hudi.framework.common.TestUtils.writeHudiData(TestUtils.scala:148)
   ```
   
   ### Details on Issue 2:
   I am trying to sync a hive table on upsert in my unit tests. Test was for a 
Non partitioned table. As per the doc 
[Hudi_Non_partitioned](https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoIuseDeltaStreamerorSparkDataSourceAPItowritetoaNon-partitionedHudidataset?),
 we need to set the below properties for Non-partitioned tables:
   ```
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
   
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   ```
   
   However, even after setting the above properties, hive table shows empty 
records. I found a related issue in 
[GitHub_Issue](https://github.com/apache/hudi/issues/933) but it is related to 
complex record key for non-partitioned table. Support for Complex key in Non 
partitioned table is also required for us and I will track the above SIM for 
the support.
   
   These are the hudi properties set on save:
   ```
   (hoodie.datasource.hive_sync.database,default)
   (hoodie.combine.before.insert,true)
   (hoodie.embed.timeline.server,false)
   (hoodie.insert.shuffle.parallelism,2)
   (hoodie.datasource.write.precombine.field,timestamp)
   (hoodie.datasource.hive_sync.use_jdbc,false)
   
(hoodie.datasource.hive_sync.partition_extractor_class,org.apache.hudi.hive.NonPartitionedExtractor)
   (hoodie.datasource.hive_sync.table,TestHudiTable)
   (hoodie.index.type,GLOBAL_BLOOM)
   (hoodie.datasource.write.operation,UPSERT)
   (hoodie.datasource.hive_sync.enable,true)
   (hoodie.datasource.write.recordkey.field,id)
   (hoodie.table.name,TestHudiTable)
   (hoodie.datasource.write.table.type,COPY_ON_WRITE)
   (hoodie.datasource.write.hive_style_partitioning,true)
   
(hoodie.datasource.write.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator)
   (hoodie.upsert.shuffle.parallelism,2)
   (hoodie.cleaner.commits.retained,15)
   ```
   
   Results:
   ```
   spark.read.format("hudi").load(<BasePath>)
   
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+---------+---------+-----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
                                                    |id 
|timestamp|partition|data |
   
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+---------+---------+-----+
   |20210528130615     |20210528130615_0_1  |id1               |                
      
|d739bf7d-9152-4565-ab01-9bf3e96d9997-0_0-29-31_20210528130615.parquet|id1|1    
    |p1       |data1|
   |20210528130615     |20210528130615_0_2  |id3               |                
      
|d739bf7d-9152-4565-ab01-9bf3e96d9997-0_0-29-31_20210528130615.parquet|id3|3    
    |p2       |data3|
   |20210528130615     |20210528130615_0_3  |id2               |                
      
|d739bf7d-9152-4565-ab01-9bf3e96d9997-0_0-29-31_20210528130615.parquet|id2|1    
    |p1       |data2|
   
+-------------------+--------------------+------------------+----------------------+---------------------------------------------------------------------+---+---------+---------+-----+
   
   spark.table(<TableName>).show(false) 
   +---+---------+---------+----+
   |id |timestamp|partition|data|
   +---+---------+---------+----+
   +---+---------+---------+----+
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   ```
     def getSnapshotData: Dataset[TestData] = {
       Seq(
         TestData(id = "id1", timestamp = 1, partition = "p1", data = "data1"),
         TestData(id = "id2", timestamp = 1, partition = "p1", data = "data2"),
         TestData(id = "id3", timestamp = 3, partition = "p2", data = "data3")
       ).toDS
     }
   
   val hudiOptions = Map("hoodie.datasource.hive_sync.database" -> "default", 
"hoodie.combine.before.insert" -> "true", "hoodie.embed.timeline.server" -> 
"false", "hoodie.insert.shuffle.parallelism" -> "2", 
"hoodie.datasource.write.precombine.field" -> "timestamp", 
"hoodie.datasource.hive_sync.use_jdbc" -> "false", 
"hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.hive.NonPartitionedExtractor", 
"hoodie.datasource.hive_sync.table" -> "TestHudiTable", "hoodie.index.type" -> 
"GLOBAL_BLOOM", "hoodie.datasource.write.operation" -> "UPSERT", 
"hoodie.datasource.hive_sync.enable" -> "true", 
"hoodie.datasource.write.recordkey.field" -> "id", "hoodie.table.name" -> 
"TestHudiTable", "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE", 
"hoodie.datasource.write.hive_style_partitioning" -> "true", 
"hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator", 
"hoodie.upsert.shuffle.parallelism" -> "2", "hoodie.cleaner.commits.reta
 ined" -> "15")
   
   
snapshot.write.mode("overwrite").options(hudiOptions).format("hudi").save(<BasePath>)
   
   ```
   **Expected behavior**
   
   Expecting to see the records in Hive table for non partitioned dataset.
   
   **Environment Description**
   
   Hudi version : 0.8
   
   Spark version : 2.4
   
   Hive version : 2.5
   
   Hadoop version : 2.5
   
   Storage (HDFS/S3/GCS..) : Local testing. Production datasets in S3.
   
   Running on Docker? (yes/no) : no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to