[ 
https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309361#comment-16309361
 ] 

liyunzhang commented on HIVE-18301:
-----------------------------------

Here some update about NPE:
 the normal case when enable rdd cache
the stacktrace of initIOContext which will intialize 
ExecMapperContext#setCurrentInputPath is
{code}
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.initIOContext(HiveContextAwareRecordReader.java:175)
 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.initIOContext(HiveContextAwareRecordReader.java:211)
 
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.<init>(CombineHiveRecordReader.java:101)
 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.initNextRecordReader(HadoopShimsSecure.java:257)
 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.<init>(HadoopShimsSecure.java:217)
 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileInputFormatShim.getRecordReader(HadoopShimsSecure.java:346)
 
org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getRecordReader(CombineHiveInputFormat.java:712)
 org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:246)
 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
 org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
 org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
 org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
 org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
 org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
 org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
 org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
 org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
 org.apache.spark.scheduler.Task.run(Task.scala:85)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
{code}

the stacktrace of ExecMapperContext#getCurrentInputPath
{code}
org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext.getCurrentInputPath(ExecMapperContext.java:113)
 
org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:512)
 
org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
 org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:543)
 
org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136)
 
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
 
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
 
org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
 org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
 org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
 org.apache.spark.scheduler.Task.run(Task.scala:85)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
 currentInputPath is hdfs://bdpe42
 java.lang.Thread.getStackTrace(Thread.java:1552)
 
org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext.getCurrentInputPath(ExecMapperContext.java:113)
 
org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:512)
 
org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
 org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:543)
 
org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136)
 
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
 
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
 
org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
 scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
 org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
 org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
 org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
 org.apache.spark.scheduler.Task.run(Task.scala:85)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
{code}

actually they are in the same thread, so NPE is not thrown out even 
{{org.apache.hadoop.hive.ql.io.IOContextMap#sparkThreadLocal}} is ThreadLocal 
variable.

In the NPE case
the stacktrace to ExecMapperContext#getCurrentInputPath
{code}
742 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:512)
743 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
744 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:543)
745 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:136)
746 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
747 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
748 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
749 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
750 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
751 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
752 18/01/03 01:01:32 INFO Executor task launch worker-1 ExecMapperContext: 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
{code}

ExecMapperContext#getCurrentInputPath and ExecMapperContext#setCurrentInputPath 
is in different thread, so NPE is thrown out. I don't know why in normal case 
{code}
MemoryStore.putIteratorAsValues->HiveMapFunctionResultList.processNextRecord
{code}
in NPE case
{code}
BypassMergeSortShuffleWriter.write->HiveMapFunctionResultList.processNextRecord
{code}

> Investigate to enable MapInput cache in Hive on Spark
> -----------------------------------------------------
>
>                 Key: HIVE-18301
>                 URL: https://issues.apache.org/jira/browse/HIVE-18301
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang
>            Assignee: liyunzhang
>
> Before IOContext problem is found in MapTran when spark rdd cache is enabled 
> in HIVE-8920.
> so we disabled rdd cache in MapTran at 
> [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202].
>   The problem is IOContext seems not initialized correctly in the spark yarn 
> client/cluster mode and caused the exception like 
> {code}
> Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most 
> recent failure: Lost task 93.3 in stage 0.0 (TID 616, bdpe48): 
> java.lang.RuntimeException: Error processing row: 
> java.lang.NullPointerException
>       at 
> org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101)
>       at 
> org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
>       at 
> org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152)
>       ... 12 more
> Driver stacktrace:
> {code}
> in yarn client/cluster mode, sometimes 
> [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109]
>  is null when rdd cach is enabled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to