a-uddhav opened a new issue #1669:
URL: https://github.com/apache/hudi/issues/1669


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   Yes
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   I am writing records to a MOR type table using Spark Datasource while 
overriding the logic for `preCombine()` and `combineAndGetUpdateValue` instead 
of using the default behavior. However, the spark application run into an 
unexpected error during compaction
   
   ```
   org.apache.hudi.exception.HoodieIOException: IOException when reading log 
file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
        at 
org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
   
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   I am running the spark application on Intellij.
   1. Write the first set of records to the MOR tbl in the OVERWRITE mode. 
   I used the following 
   ```
   // record class
   case class RecordClass (id:Int,
                           name:String,
                           age:Int,
                           event_type:Int,
                           created_at:Timestamp)
   
      RecordClass(1, "Uddhav-1", 30, 0,Timestamp.valueOf("2020-09-07 
00:00:00")), 
       RecordClass(1, "Uddhav0", 40, 1,  Timestamp.valueOf("2020-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 31, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(3, "Uddhav2", 32, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
       RecordClass(4, "Uddhav3", 33, 0, Timestamp.valueOf("2023-09-07 
00:00:00"))
   ```
   with 
   ```
   //hudi options 
   HoodieWriteConfig.TABLE_NAME -> "<tblName>",
       DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
       DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
       DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "<date part of the 
created_at_column>",
       DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "created_at",
       DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> "<payload class>"
   ```
   and 
   ```
   //payload class
   class RecordPayloadClass (val genericRecord: GenericRecord, val comparable: 
Comparable[Int])
     extends BaseAvroPayload(genericRecord, comparable)
       with HoodieRecordPayload[RecordPayloadClass]{
   
     override def preCombine(another: RecordPayloadClass): RecordPayloadClass  
= {
   
       val latestRecord: GenericRecord = pickLatestRecord(this.genericRecord, 
another.genericRecord)
   
       if (latestRecord.equals(this.genericRecord)) this else another
     }
   
     // get the current record
     override def combineAndGetUpdateValue(currentValue: IndexedRecord, schema: 
Schema): util.Option[IndexedRecord] = {
       val diskRecord: GenericRecord = currentValue.asInstanceOf[GenericRecord]
       val inFlighRecord: GenericRecord = this.genericRecord
   
       Option.of(pickLatestRecord(diskRecord, inFlighRecord))
     }
   
     override def getInsertValue(schema: Schema): util.Option[IndexedRecord] =  
Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema))
   
     private def pickLatestRecord(diskRecord: GenericRecord, inFlighRecord: 
GenericRecord): GenericRecord  = {
       val diskRecordTs:Int = diskRecord.get("event_type").asInstanceOf[Int]
       val inFlighRecordTs:Int = 
inFlighRecord.get("event_type").asInstanceOf[Int]
   
       if(inFlighRecordTs > diskRecordTs) inFlighRecord
   
       else if (diskRecordTs == inFlighRecordTs) {
         val diskRecordEventType:Int = diskRecord.get("age").asInstanceOf[Int]
         val inFlighRecordEventType:Int = 
inFlighRecord.get("age").asInstanceOf[Int]
         // always pick the latest record
         if(inFlighRecordEventType >= diskRecordEventType) inFlighRecord else 
diskRecord
       }
   
       else diskRecord
     }
   }
   ```
   This write should be successful
   
   2. Then write another set of records in the APPEND mode. I used the following
   ```
   RecordClass(1, "Uddhav4", 41, 1, Timestamp.valueOf("2020-09-07 00:00:00")), 
       RecordClass(1, "Uddhav5", 42, 1, Timestamp.valueOf("2020-09-07 
00:00:00")), 
   
       RecordClass(3, "Uddhav2", 32, 0, Timestamp.valueOf("2022-09-07 
00:00:00")), 
   
       RecordClass(4, "Uddhav3", 33, 2, Timestamp.valueOf("2023-09-07 
00:00:00")), 
   
       RecordClass(1, "Uddhav5", 42, 1, Timestamp.valueOf("2020-09-07 
00:00:00")), 
       RecordClass(1, "Uddhav5", 43, 1, Timestamp.valueOf("2020-09-07 
00:00:00")), 
       RecordClass(1, "Uddhav5", 44, 1, Timestamp.valueOf("2020-09-07 
00:00:00")), 
       RecordClass(1, "Uddhav5", 45, 1, Timestamp.valueOf("2020-09-07 
00:00:00")), 
       RecordClass(1, "Uddhav5", 46, 1, Timestamp.valueOf("2020-09-07 
00:00:00")), 
   
       RecordClass(3, "Uddhav2", 32, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
       RecordClass(3, "Uddhav21", 33, 1, Timestamp.valueOf("2022-09-07 
00:00:00")),
       RecordClass(3, "Uddhav22", 34, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
       RecordClass(3, "Uddhav23", 35, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
       RecordClass(3, "Uddhav24", 36, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
       RecordClass(3, "Uddhav25", 38, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
       RecordClass(3, "Uddhav26", 39, 1, Timestamp.valueOf("2022-09-07 
00:00:00")), 
   
       RecordClass(2, "Uddhav1", 31, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 32, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 33, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 34, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 35, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 36, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 37, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 38, 0, Timestamp.valueOf("2021-09-07 
00:00:00")),
       RecordClass(2, "Uddhav1", 39, 0, Timestamp.valueOf("2021-09-07 
00:00:00"))
   ```
   This  write should fail with the aforementioned `HoodieIOException`
   
   **Expected behavior**
   
   The expectation was that records with higher `event type` and/or `age` 
should be written out to the hudi table. However, instead, I ran into the 
exception. The above logic works well when the tbl type is COW.
   
   **Environment Description**
   
   * Hudi version : 0.5.2
   
   * Spark version : 2.4.4
   
   * Hive version : No
   
   * Hadoop version : No
   
   * Storage (HDFS/S3/GCS..) : Local FS
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```
   Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
to stage failure: Task 1 in stage 28.0 failed 1 times, most recent failure: 
Lost task 1.0 in stage 28.0 (TID 13534, localhost, executor driver): 
org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
        at 
org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
        at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1111)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at 
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at 
org.apache.hudi.client.HoodieWriteClient.doCompactionCommit(HoodieWriteClient.java:1123)
        at 
org.apache.hudi.client.HoodieWriteClient.commitCompaction(HoodieWriteClient.java:1091)
        at 
org.apache.hudi.client.HoodieWriteClient.runCompaction(HoodieWriteClient.java:1074)
        at 
org.apache.hudi.client.HoodieWriteClient.compact(HoodieWriteClient.java:1045)
        at 
org.apache.hudi.client.HoodieWriteClient.lambda$forceCompact$12(HoodieWriteClient.java:1160)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at 
org.apache.hudi.client.HoodieWriteClient.forceCompact(HoodieWriteClient.java:1157)
        at 
org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:502)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:157)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:101)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:92)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.checkWriteStatus(HoodieSparkSqlWriter.scala:262)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:184)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
        at 
com.paypay.daas.uddhav.CustomPreCombine2$.delayedEndpoint$com$paypay$daas$uddhav$CustomPreCombine2$1(CustomPreCombine2.scala:77)
        at 
com.paypay.daas.uddhav.CustomPreCombine2$delayedInit$body.apply(CustomPreCombine2.scala:10)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at 
com.paypay.daas.uddhav.CustomPreCombine2$.main(CustomPreCombine2.scala:10)
        at 
com.paypay.daas.uddhav.CustomPreCombine2.main(CustomPreCombine2.scala)
   Caused by: org.apache.hudi.exception.HoodieIOException: IOException when 
reading log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.compact(HoodieMergeOnReadTableCompactor.java:126)
        at 
org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor.lambda$compact$644ebad7$1(HoodieMergeOnReadTableCompactor.java:98)
        at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1111)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to