[ https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330190#comment-16330190 ]
liyunzhang commented on HIVE-18301: ----------------------------------- [~lirui]: {quote}My understanding is if the HadoopRDD is cached, the records are not produced by record reader and IOContext is not populated. Therefore the information in IOContext will be unavailable, e.g. the input path. This may cause problem because some operators need to take certain actions when input file changes – {{Operator::cleanUpInputFileChanged}}. So basically my point is we have to figure out the scenarios where IOContext is necessary. Then decide whether we should disable caching in such cases. {quote} Yes, if HadoopRDD is cached, it will not call {code:java} CombineHiveRecordReader#init ->HiveContextAwareRecordReader.initIOContext ->IOContext.setInputPath {code} . It will use the cached result to call MapOperator#process(Writable value), so NPE is thrown because at that time IOContext.getInputPath return null. Now I just modify the code of MapOperator#process(Writable value) like [link|https://github.com/kellyzly/hive/commit/e81b7df572e2c543095f55dd160b428c355da2fb] Here my question is 1. when {{context.getIoCxt().getInputPath() == null}}, I think in this situation, this record is from cache not from CombineHiveRecordReader. We need not to call MapOperator#cleanUpInputFileChanged because MapOperator#cleanUpInputFileChanged is only designed for one Mapper scanning multiple files(like CombineFileInputFormat) and multiple partitions and inputPath will change in these situations and need to call {{cleanUpInputFileChanged}} to reinitialize [some variables|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java#L532] but we need not consider reinitialization for a cached record. Is my understanding right? If right, is there any other way to judge this is cached record or not except by {{context.getIoCxt().getInputPath() == null}} 2. how to initiliaze IOContext#getInputPath in cache situation? we need this variable to reinitialize MapOperator::currentCtxs in MapOperator#initializeContexts {code:java} public void initializeContexts() { Path fpath = getExecContext().getCurrentInputPath(); String nominalPath = getNominalPath(fpath); Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath); currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } {code} in the code, we store MapOpCtx for every MapOperator in opCtxMap(Map<String, Map<Operator<?>, MapOpCtx>>). In table with partitions, there will be multiple elements in opCtxMap( opCtxMap.keySet() is a set containing partition names). Currently I test on a table without partitions and can directly use opCtxMap.values().iterator().next() to initialize [context|https://github.com/kellyzly/hive/blob/HIVE-17486.4/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java#L713] and runs successfully in yarn mode. But I guess this is not right with partitioned table. > Investigate to enable MapInput cache in Hive on Spark > ----------------------------------------------------- > > Key: HIVE-18301 > URL: https://issues.apache.org/jira/browse/HIVE-18301 > Project: Hive > Issue Type: Bug > Reporter: liyunzhang > Assignee: liyunzhang > Priority: Major > > Before IOContext problem is found in MapTran when spark rdd cache is enabled > in HIVE-8920. > so we disabled rdd cache in MapTran at > [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202]. > The problem is IOContext seems not initialized correctly in the spark yarn > client/cluster mode and caused the exception like > {code} > Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most > recent failure: Lost task 93.3 in stage 0.0 (TID 616, bdpe48): > java.lang.RuntimeException: Error processing row: > java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165) > at > org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) > at > org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) > at > org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) > at > scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101) > at > org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516) > at > org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187) > at > org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546) > at > org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152) > ... 12 more > Driver stacktrace: > {code} > in yarn client/cluster mode, sometimes > [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109] > is null when rdd cach is enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)