[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-10-02 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188827#comment-16188827
 ] 

Apache Spark commented on SPARK-20466:
--

User 'sahilTakiar' has created a pull request for this issue:
https://github.com/apache/spark/pull/19413

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-09-25 Thread Sahil Takiar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16179535#comment-16179535
 ] 

Sahil Takiar commented on SPARK-20466:
--

[~vanzin] thanks for taking a look. Yes, you are right. I'll start working on a 
PR.

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-09-25 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16179498#comment-16179498
 ] 

Marcelo Vanzin commented on SPARK-20466:


Explanation makes sense, but your proposed solution is also race-prone (two 
calls to {{getCachedMetadata}}). You want something like:

{code}
Option(HadoopRDD.getCachedMetadata(jobConfCacheKey)).getOrElse( /* recover from 
when there's no cached metadata */
{code}


> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-09-25 Thread Sahil Takiar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16179463#comment-16179463
 ] 

Sahil Takiar commented on SPARK-20466:
--

[~srowen], [~vanzin] does my analysis of this bug make sense? If you agree this 
is a bug, I can make a PR to fix it.

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-09-19 Thread Sahil Takiar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16172747#comment-16172747
 ] 

Sahil Takiar commented on SPARK-20466:
--

I can't remember exactly which query, I was running a chain of about 10 TPC-DS 
queries, all in the same HoS session. It was a 1 TB Parquet dataset though.

The exception is because {{HadoopRDD.containsCachedMetadata}} can return 
{{true}}, but then a future call to {{HadoopRDD.getCachedMetadata}} on the same 
key, can return {{null}}. This can happen if the JVM decided to GC some of the 
entires in the metadata cache (which it can since [soft 
references|https://docs.oracle.com/javase/7/docs/api/java/lang/ref/SoftReference.html]
 are used).

We would have to change it to something like:

{code}
} else {
Object conf = HadoopRDD.getCachedMetadata(jobConfCacheKey)
if (conf != null) {
logDebug("Re-using cached JobConf")
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
}
}
{code}

I'm not sure how to write it exactly in Scala, but something like that. Once 
you create {{Object conf}} and point it to the result of 
{{HadoopRDD.getCachedMetdata(jobConfCacheKey)}}, you then have a hard reference 
to the object and it can no longer be GCd.

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-09-18 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171009#comment-16171009
 ] 

liyunzhang_intel commented on SPARK-20466:
--

[~stakiar]:  
this exception happened on which query of tpcds?  I found in another benchmark 
test(TPCx-BB)
{quote}
This cache uses soft references, so the JVM may reclaim entries from the map 
whenever there is some GC pressure. In which case, any get request on the key 
will return a null. The race condition is that the #getJobConf method first 
checks if the cache contains the key, and then retrieves. In between the 
containsKey and get its possible the the key is GCed by the JVM. 
{quote}
this exception is because {{HadoopRDD.containsCachedMetadata(jobConfCacheKey)}} 
returns soft reference and it will return {{null}} when GC happens? If it 
changes to 
{code}
 else if ( HadoopRDD.getCachedMetadata(jobConfCacheKey) != null) {
logDebug("Re-using cached JobConf")
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
  }
{code}
 HadoopRDD.getCachedMetadata(jobConfCacheKey) will not return null if GC 
happens?





> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-09-18 Thread Sahil Takiar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170759#comment-16170759
 ] 

Sahil Takiar commented on SPARK-20466:
--

I just hit this issue in Hive-on-Spark when running some TPC-DS queries. It 
seems to be intermittent, re-tries of the task succeed. I have a very similar 
stack trace:

{code}
java.lang.NullPointerException
at 
org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:364)
at 
org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:238)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}

The {{JobConf}} object can be {{null}} if {{HadoopRDD#getJobConf}} returns 
{{null}}. Looks like there is a race condition in {{#getJobConf}} 
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L160].
 The method {{HadoopRDD.containsCachedMetadata}} looks into an internal 
metadata cache - {{SparkEnv#hadoopJobMetadata}}. This cache uses soft 
references, so the JVM may reclaim entries from the map whenever there is some 
GC pressure. In which case, any get request on the key will return a {{null}}. 
The race condition is that the {{#getJobConf}} method first checks if the cache 
contains the key, and then retrieves. In between the {{containsKey}} and 
{{get}} its possible the the key is GCed by the JVM. This would cause 
{{#getJobConf}} to return {{null}}.

The fix should be pretty simple, don't use the {{containsKey(key)}} method on 
the cache, just run a {{get(key)}} and check if it returns {{null}} or not.

Happy to create a PR if other agrees with my analysis.

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
>Priority: Minor
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf

[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-06-20 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056991#comment-16056991
 ] 

liyunzhang_intel commented on SPARK-20466:
--

[~q79969786]: found the NPE when runing Hive on Spark on TPCx-BB. But i did not 
remember to run which query to find this exception. But it is better to add a 
judge( if conf != null ) in 
[code|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L374]

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20466) HadoopRDD#addLocalConfiguration throws NPE

2017-06-20 Thread Yuming Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056985#comment-16056985
 ] 

Yuming Wang commented on SPARK-20466:
-

[~kellyzly] How to reproduce it?

> HadoopRDD#addLocalConfiguration throws NPE
> --
>
> Key: SPARK-20466
> URL: https://issues.apache.org/jira/browse/SPARK-20466
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 2.0.2
>Reporter: liyunzhang_intel
> Attachments: NPE_log
>
>
> in spark2.0.2, it throws NPE
> {code}
>   17/04/23 08:19:55 ERROR executor.Executor: Exception in task 439.0 in stage 
> 16.0 (TID 986)$ 
> java.lang.NullPointerException$
> ^Iat 
> org.apache.spark.rdd.HadoopRDD$.addLocalConfiguration(HadoopRDD.scala:373)$
> ^Iat org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:243)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)$
> ^Iat org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)$
> ^Iat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)$
> ^Iat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)$
> ^Iat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)$
> ^Iat org.apache.spark.scheduler.Task.run(Task.scala:86)$
> ^Iat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)$
> ^Iat 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)$
> ^Iat java.lang.Thread.run(Thread.java:745)$
> {code}
> suggestion to add some code to avoid NPE
> {code} 
>/** Add Hadoop configuration specific to a single partition and attempt. */
>   def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, 
> attemptId: Int,
> conf: JobConf) {
> val jobID = new JobID(jobTrackerId, jobId)
> val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), 
> attemptId)
> if ( conf != null){
> conf.set("mapred.tip.id", taId.getTaskID.toString)
> conf.set("mapred.task.id", taId.toString)
> conf.setBoolean("mapred.task.is.map", true)
> conf.setInt("mapred.task.partition", splitId)
> conf.set("mapred.job.id", jobID.toString)
>}
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org