This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 662c5df7534 [SPARK-38922][CORE] TaskLocation.apply throw 
NullPointerException
662c5df7534 is described below

commit 662c5df75341473e5ea4057d5f8300516ca025fa
Author: Kent Yao <y...@apache.org>
AuthorDate: Wed Apr 20 14:38:26 2022 +0800

    [SPARK-38922][CORE] TaskLocation.apply throw NullPointerException
    
    ### What changes were proposed in this pull request?
    
    TaskLocation.apply w/o NULL check may throw NPE and fail job scheduling
    
    ```
    
    Caused by: java.lang.NullPointerException
        at 
scala.collection.immutable.StringLike$class.stripPrefix(StringLike.scala:155)
        at scala.collection.immutable.StringOps.stripPrefix(StringOps.scala:29)
        at org.apache.spark.scheduler.TaskLocation$.apply(TaskLocation.scala:71)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal
    ```
    
    For instance, `org.apache.spark.rdd.HadoopRDD#convertSplitLocationInfo` 
might generate unexpected `Some(null)` elements where should be replace by 
`Option.apply`
    
    ### Why are the changes needed?
    
    fix NPE
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    Closes #36222 from yaooqinn/SPARK-38922.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit 33e07f3cd926105c6d28986eb6218f237505549e)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../scala/org/apache/spark/rdd/HadoopRDD.scala     |  2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  2 +-
 .../org/apache/spark/rdd/HadoopRDDSuite.scala      | 30 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 5fc0b4f736d..ec9ab9c0663 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -452,7 +452,7 @@ private[spark] object HadoopRDD extends Logging {
        infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
     Option(infos).map(_.flatMap { loc =>
       val locationStr = loc.getLocation
-      if (locationStr != "localhost") {
+      if (locationStr != null && locationStr != "localhost") {
         if (loc.isInMemory) {
           logDebug(s"Partition $locationStr is cached by Hadoop.")
           Some(HDFSCacheTaskLocation(locationStr).toString)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9a27d9cbad2..a82d261d545 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2528,7 +2528,7 @@ private[spark] class DAGScheduler(
     // If the RDD has some placement preferences (as is the case for input 
RDDs), get those
     val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
     if (rddPrefs.nonEmpty) {
-      return rddPrefs.map(TaskLocation(_))
+      return rddPrefs.filter(_ != null).map(TaskLocation(_))
     }
 
     // If the RDD has narrow dependencies, pick the first partition of the 
first narrow dependency
diff --git a/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala
new file mode 100644
index 00000000000..b43d76c114c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.mapred.SplitLocationInfo
+
+import org.apache.spark.SparkFunSuite
+
+class HadoopRDDSuite extends SparkFunSuite {
+
+  test("SPARK-38922: HadoopRDD convertSplitLocationInfo contains Some(null) 
cause NPE") {
+    val locs = Array(new SplitLocationInfo(null, false))
+    assert(HadoopRDD.convertSplitLocationInfo(locs).get.isEmpty)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to