amrishlal commented on code in PR #8978:
URL: https://github.com/apache/hudi/pull/8978#discussion_r1247016425


##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -41,13 +42,30 @@ public Option<Pair<HoodieRecord, Schema>> 
merge(HoodieRecord older, Schema oldSc
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
     ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
 
-    if (newer.getData() == null) {
-      // Delete record
-      return Option.empty();
+    if (newer instanceof HoodieSparkRecord) {
+      HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+      if (newSparkRecord.isDeleted()) {
+        // Delete record
+        return Option.empty();
+      }
+    } else {
+      if (newer.getData() == null) {

Review Comment:
   Test case failures occur in `TestMORDataSource` (`testPayloadDelete` for 
example) where test cases fail with following exception:
   
   ```
   1284819 [Executor task launch worker for task 2.0 in stage 107.0 (TID 136)] 
ERROR org.apache.spark.executor.Executor [] - Exception in task 2.0 in stage 
107.0 (TID 136)
   java.lang.ClassCastException: org.apache.hudi.common.model.HoodieEmptyRecord 
cannot be cast to org.apache.hudi.common.model.HoodieSparkRecord
        at 
org.apache.hudi.HoodieSparkRecordMerger.merge(HoodieSparkRecordMerger.java:45) 
~[classes/:?]
        at org.apache.hudi.RecordMergingFileIterator.merge(Iterators.scala:241) 
~[classes/:?]
        at 
org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:218) 
~[classes/:?]
        at 
org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:203) 
~[classes/:?]
        at 
org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36) 
~[classes/:?]
        at 
org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36) 
~[classes/:?]
        at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:61) 
~[classes/:?]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) ~[?:?]
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 ~[spark-sql_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
 ~[spark-sql_2.12-3.3.1.jar:3.3.1]
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
~[scala-library-2.12.15.jar:?]
        at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.scheduler.Task.run(Task.scala:136) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_372]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]```
        
   The HoodieEmptyRecord that is leading to `ClassCastException` is being 
created in `HoodieMergedLogRecordScanner.java` Line 295
   ```    // Put the DELETE record
       if (recordType == HoodieRecordType.AVRO) {
         records.put(key, SpillableMapUtils.generateEmptyPayload(key,
             deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), 
getPayloadClassFQN()));
       } else {
         HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key, 
deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(), 
recordType);
         records.put(key, record);
       }
   ```
   
   Based on offline discussion, we decided to continue with `instanceof` check 
before casting to `HoodieSparkRecord`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to