[ https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309361#comment-16309361 ]
liyunzhang commented on HIVE-18301: ----------------------------------- Here some update about NPE: the normal case when enable rdd cache the stacktrace of initIOContext which will intialize ExecMapperContext#setCurrentInputPath is {code} org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.initIOContext(HiveContextAwareRecordReader.java:175) org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.initIOContext(HiveContextAwareRecordReader.java:211) org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.<init>(CombineHiveRecordReader.java:101) sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:423) org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.initNextRecordReader(HadoopShimsSecure.java:257) org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.<init>(HadoopShimsSecure.java:217) org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileInputFormatShim.getRecordReader(HadoopShimsSecure.java:346) org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getRecordReader(CombineHiveInputFormat.java:712) org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:246) org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209) org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) org.apache.spark.rdd.RDD.iterator(RDD.scala:283) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) org.apache.spark.rdd.RDD.iterator(RDD.scala:281) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) org.apache.spark.rdd.RDD.iterator(RDD.scala:281) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} the stacktrace of ExecMapperContext#getCurrentInputPath {code} org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext.getCurrentInputPath(ExecMapperContext.java:113) org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:512) org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187) org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:543) org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136) org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) org.apache.spark.rdd.RDD.iterator(RDD.scala:281) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) currentInputPath is hdfs://bdpe42 java.lang.Thread.getStackTrace(Thread.java:1552) org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext.getCurrentInputPath(ExecMapperContext.java:113) org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:512) org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187) org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:543) org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136) org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910) org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668) org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) org.apache.spark.rdd.RDD.iterator(RDD.scala:281) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} actually they are in the same thread, so NPE is not thrown out even {{org.apache.hadoop.hive.ql.io.IOContextMap#sparkThreadLocal}} is ThreadLocal variable. In the NPE case the stacktrace to ExecMapperContext#getCurrentInputPath {code} 742 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:512) 743 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187) 744 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:543) 745 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136) 746 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) 747 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) 748 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) 749 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) 750 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 751 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 752 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) {code} ExecMapperContext#getCurrentInputPath and ExecMapperContext#setCurrentInputPath is in different thread, so NPE is thrown out. I don't know why in normal case {code} MemoryStore.putIteratorAsValues->HiveMapFunctionResultList.processNextRecord {code} in NPE case {code} BypassMergeSortShuffleWriter.write->HiveMapFunctionResultList.processNextRecord {code} > Investigate to enable MapInput cache in Hive on Spark > ----------------------------------------------------- > > Key: HIVE-18301 > URL: https://issues.apache.org/jira/browse/HIVE-18301 > Project: Hive > Issue Type: Bug > Reporter: liyunzhang > Assignee: liyunzhang > > Before IOContext problem is found in MapTran when spark rdd cache is enabled > in HIVE-8920. > so we disabled rdd cache in MapTran at > [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202]. > The problem is IOContext seems not initialized correctly in the spark yarn > client/cluster mode and caused the exception like > {code} > Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most > recent failure: Lost task 93.3 in stage 0.0 (TID 616, bdpe48): > java.lang.RuntimeException: Error processing row: > java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165) > at > org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) > at > org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) > at > org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101) > at > org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516) > at > org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187) > at > org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546) > at > org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152) > ... 12 more > Driver stacktrace: > {code} > in yarn client/cluster mode, sometimes > [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109] > is null when rdd cach is enabled. -- This message was sent by Atlassian JIRA (v6.4.14#64029)