joao-miranda opened a new issue, #6552:
URL: https://github.com/apache/hudi/issues/6552

   **Describe the problem you faced**
   We are getting Full Load + CDC data from a RDBMS using AWS Database 
Migration Service into an S3 bucket. We then use Hudi in a Scala Glue Job to 
concatenate the files into a correct representation of the current status of 
the database. DMS adds two columns to the data: Op (with values null, I, U or 
D) and ts (timestamp of the operation). We are not using Hive or Avro.
   
   This works fine with Hudi 0.9.0 and Hudi 0.10.0. Once we try to upgrade to 
Hudi 0.11.0, 0.11.1 or 0.12.0, AWSDmsAvroPayload fails with the following error:
   ```
   33061 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle  - 
Error writing record HoodieRecord{key=HoodieKey { recordKey=id:3 
partitionPath=}, currentLocation='null', newLocation='null'}
   java.util.NoSuchElementException: No value present in Option
           at org.apache.hudi.common.util.Option.get(Option.java:89)
           at 
org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:72)
           at 
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
           at 
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
           at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   Removing the PAYLOAD_CLASS_OPT_KEY option from the config makes it so that 
the Job doesn't fail, but the delete operations are not applied. No other 
payload class seems to work with the DMS format.
   
   **Steps to reproduce the behavior**
   **Dependencies:**
   ```
   "org.apache.hudi" %% "hudi-spark-bundle" % "2.12-0.12.0"
   "org.apache.hudi" %% "hudi-utilities-bundle" % "2.12-0.12.0"
   ```
   
   **Configuration used:**
   ```
   var hudiOptions = scala.collection.mutable.Map[String, String](
         HoodieWriteConfig.TABLE_NAME -> "hudiTableName",
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "true",
         DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "primaryKeyField",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY ->  "ts",
         DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> 
classOf[AWSDmsAvroPayload].getName,
         DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[CustomKeyGenerator].getName,
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, ""
       )
   ```
   
   **Following options are added if a partition key is defined:**
   ```
         hudiOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionKeyField")
         
hudiOptions.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
         hudiOptions.put(HoodieIndexConfig.INDEX_TYPE.key(), "GLOBAL_BLOOM")
         
hudiOptions.put(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(),
 "true")
         hudiOptions.put(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), 
"true")
   ```
   
   **Saved into a file:**
   ```
       // Write the DataFrame as a Hudi dataset
       mappedDF
         .dropDuplicates()
         .write
         .format("org.apache.hudi")
         .options(hudiOptions)
         .mode(SaveMode.Append)
         .save("targetDirectory")
   ```
   
   **Expected behavior**
   Data obtained from using Hudi reflects the data present in the DB.
   
   **Environment Description**
   
   - Hudi version : 0.12.0
   - Spark version : 3.1.1
   - Scala version: 2.12.15
   - AWS Glue version : 3.0.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to