amrishlal commented on code in PR #8978: URL: https://github.com/apache/hudi/pull/8978#discussion_r1247016425
########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java: ########## @@ -41,13 +42,30 @@ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSc ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.SPARK); ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.SPARK); - if (newer.getData() == null) { - // Delete record - return Option.empty(); + if (newer instanceof HoodieSparkRecord) { + HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer; + if (newSparkRecord.isDeleted()) { + // Delete record + return Option.empty(); + } + } else { + if (newer.getData() == null) { Review Comment: Test case failures occur in `TestMORDataSource` (`testPayloadDelete` for example) where test cases fail with following exception: ``` 1284819 [Executor task launch worker for task 2.0 in stage 107.0 (TID 136)] ERROR org.apache.spark.executor.Executor [] - Exception in task 2.0 in stage 107.0 (TID 136) java.lang.ClassCastException: org.apache.hudi.common.model.HoodieEmptyRecord cannot be cast to org.apache.hudi.common.model.HoodieSparkRecord at org.apache.hudi.HoodieSparkRecordMerger.merge(HoodieSparkRecordMerger.java:45) ~[classes/:?] at org.apache.hudi.RecordMergingFileIterator.merge(Iterators.scala:241) ~[classes/:?] at org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:218) ~[classes/:?] at org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:203) ~[classes/:?] at org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36) ~[classes/:?] at org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36) ~[classes/:?] at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:61) ~[classes/:?] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) ~[?:?] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[spark-sql_2.12-3.3.1.jar:3.3.1] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) ~[spark-sql_2.12-3.3.1.jar:3.3.1] at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?] at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1.jar:3.3.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]``` The HoodieEmptyRecord that is leading to `ClassCastException` is being created in `HoodieMergedLogRecordScanner.java` Line 295 ``` // Put the DELETE record if (recordType == HoodieRecordType.AVRO) { records.put(key, SpillableMapUtils.generateEmptyPayload(key, deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), getPayloadClassFQN())); } else { HoodieEmptyRecord record = new HoodieEmptyRecord<>(new HoodieKey(key, deleteRecord.getPartitionPath()), null, deleteRecord.getOrderingValue(), recordType); records.put(key, record); } ``` Based on offline discussion, we decided to continue with `instanceof` check before casting to `HoodieSparkRecord`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org