[ 
https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350077#comment-16350077
 ] 

liyunzhang edited comment on HIVE-18301 at 2/2/18 9:54 AM:
-----------------------------------------------------------

[~xuefuz]:
{quote}Instead of putting the input patch in each row, like what the patch is 
proposing, could we send a serialized IOContext object as a special row 
whenever the content of the object changes?
{quote}
This is good idea. IOContext is bind to split not to bind to each record. So I 
changed to following idea.
{code:java}
 inputRDD1                inputRDD2
        |CopyFunction            | CopyFunction
    CopyRDD1                CopyRDD2
        |                       |
       MT_11                       MT_12
        |                       |
       RT_1                         RT_2
         \                      /
                     Union  
{code}
in the CopyRDD1, I only store IOContext as the tuple_1() of result when the 
IOContext#getInputPath is changed and store null in other situation. Thus it 
will reduce the size  of data increment of this solution. In MT_12, it 
initializes the IOContext when IOContext#getInputPath is null, once 
IOContext#getInputPath has value, we need not initialize it again in the same 
thread.  Any suggestion?


was (Author: kellyzly):
[~xuefuz]:
{quote}
 Instead of putting the input patch in each row, like what the patch is 
proposing, could we send a serialized IOContext object as a special row 
whenever the content of the object changes? 
{quote}
This is good idea.  IOContext is bind to split not to bind to each record.  So 
I changed to following idea.
{code}
 inputRDD1                inputRDD2
        |CopyFunction            | CopyFunction
    CopyRDD1                CopyRDD2
        |                       |
       MT_11                       MT_12
        |                       |
       RT_1                         RT_2
         \                      /
                     Union  
{code}

in the CopyRDD1, I only store IOContext as the tuple_1() of result when the 
IOContext#getInputPath is changed and store null in other situation. Thus it 
will reduce the size of data size increment of this solution. In MT_12, it 
initialize the IOContext when IOContext#getInputPath is null, once  
IOContext#getInputPath has value, we need not initialize it again in the same 
thread.

> Investigate to enable MapInput cache in Hive on Spark
> -----------------------------------------------------
>
>                 Key: HIVE-18301
>                 URL: https://issues.apache.org/jira/browse/HIVE-18301
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang
>            Assignee: liyunzhang
>            Priority: Major
>         Attachments: HIVE-18301.1.patch, HIVE-18301.2.patch, HIVE-18301.patch
>
>
> Before IOContext problem is found in MapTran when spark rdd cache is enabled 
> in HIVE-8920.
> so we disabled rdd cache in MapTran at 
> [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202].
>   The problem is IOContext seems not initialized correctly in the spark yarn 
> client/cluster mode and caused the exception like 
> {code}
> Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most 
> recent failure: Lost task 93.3 in stage 0.0 (TID 616, bdpe48): 
> java.lang.RuntimeException: Error processing row: 
> java.lang.NullPointerException
>       at 
> org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101)
>       at 
> org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
>       at 
> org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152)
>       ... 12 more
> Driver stacktrace:
> {code}
> in yarn client/cluster mode, sometimes 
> [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109]
>  is null when rdd cach is enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to