Regarding (1) , As the exception is happening inside parquet reader (outside hudi), can you use Spark 2.3 (instead of spark 2.4 which brings in particular version of avro/parquet) to create and ingest a brand new dataset and try it out. This would hopefully help isolate the issue.
Regarding (2), +1 on vinoth's suggestion. But if you are very sure, can you see if there is any pattern around missing records ? Are the missing records all in the same partition ? Balaji.V On Mon, Nov 11, 2019 at 1:30 PM Zhengxiang Pan <[email protected]> wrote: > Hi > > The snippet for issue is here > https://gist.github.com/zxpan/c5e989958d7688026f1679e53d2fca44 > 1) write script is to simulate to migrate existing data frame (saved in > /tmp/hudi-testing/inserts > parquet) > 2) update script is to simulate to incremental update (saved in > /tmp/hudi-testing/updates > parquet) the existing dataset, this is where the issue > > See attached inserts parquet file and updates parquet file. > > Your help is appreciated. > Thanks > > > On Mon, Nov 11, 2019 at 11:23 AM Zhengxiang Pan <[email protected]> wrote: > >> Thanks for quick response. will try to create snippet to reduce the issue. >> >> For number 2), I am aware of the de-dup behavior. pretty sure the >> precombine key is unique. >> >> Thanks >> >> On Mon, Nov 11, 2019 at 8:46 AM Vinoth Chandar <[email protected]> wrote: >> >>> Hi, >>> >>> On 1. I am wondering if its relatd to >>> https://issues.apache.org/jira/browse/HUDI-83 , i.e support for >>> timestamps. >>> if you can give us a small snippet to reproduce the problem that would be >>> great. >>> >>> On 2, Not sure whats going on. there are no size limitations. Please >>> check >>> if you precombine field and keys are correct.. for eg if you pick a >>> field/value that is in all records,then precombine will crunch it down to >>> just 1 record, coz thats what we ask it do. >>> >>> On Sun, Nov 10, 2019 at 6:46 PM Zhengxiang Pan <[email protected]> >>> wrote: >>> >>> > Hi, >>> > I am new to the Hudi, my first attempt is to convert my existing >>> dataframe >>> > to Hudi managed dataset. I follow the Quick guide and Option (2) or >>> (3) In >>> > Migration Guide. Got two issues >>> > >>> > 1) Got the following error when Append mode afterward to upsert the >>> data >>> > org.apache.spark.SparkException: Job aborted due to stage failure: >>> Task 4 >>> > in stage 23.0 failed 4 times, most recent failure: Lost task 4.3 in >>> stage >>> > 23.0 (TID 74, tkcnode49.alphonso.tv, executor 7): >>> > org.apache.hudi.exception.HoodieUpsertException: Error upserting >>> bucketType >>> > UPDATE for partition :4 >>> > at >>> > >>> > >>> org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:261) >>> > at >>> > >>> > >>> org.apache.hudi.HoodieWriteClient.lambda$upsertRecordsInternal$507693af$1(HoodieWriteClient.java:428) >>> > at >>> > >>> > >>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) >>> > at >>> > >>> > >>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) >>> > at >>> > >>> > >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) >>> > at >>> > >>> > >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) >>> > at >>> > >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> > at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> > at >>> > >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> > at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) >>> > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) >>> > at >>> > >>> > >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) >>> > at >>> > >>> > >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> > at >>> > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> > at >>> > >>> > >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> > at >>> > >>> > >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) >>> > at >>> > >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> > at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> > at >>> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >>> > at org.apache.spark.scheduler.Task.run(Task.scala:121) >>> > at >>> > >>> > >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) >>> > at >>> > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >>> > at >>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) >>> > at >>> > >>> > >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> > at >>> > >>> > >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> > at java.lang.Thread.run(Thread.java:748) >>> > >>> > I noticed that "Date" type is converted to "Long" type in hudi dataset. >>> > >>> > I workaround to save my dataframe to JSONL, and read back to save it to >>> > Hudi managed dataset. >>> > >>> > are there any requirement for data schema conversion explicitly from my >>> > original data frame? >>> > >>> > 2) even if I managed to get around first issue, the number of records >>> in >>> > Hudi managed data is way less than my original data frame. >>> > >>> > Is there any size limitation in Hudi dataset? >>> > >>> > Thanks >>> > >>> >>
