This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new fefd54d7fa1 [SPARK-38922][CORE] TaskLocation.apply throw NullPointerException fefd54d7fa1 is described below commit fefd54d7fa11c22517603d498a6273b237b867ef Author: Kent Yao <y...@apache.org> AuthorDate: Wed Apr 20 14:38:26 2022 +0800 [SPARK-38922][CORE] TaskLocation.apply throw NullPointerException ### What changes were proposed in this pull request? TaskLocation.apply w/o NULL check may throw NPE and fail job scheduling ``` Caused by: java.lang.NullPointerException at scala.collection.immutable.StringLike$class.stripPrefix(StringLike.scala:155) at scala.collection.immutable.StringOps.stripPrefix(StringOps.scala:29) at org.apache.spark.scheduler.TaskLocation$.apply(TaskLocation.scala:71) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal ``` For instance, `org.apache.spark.rdd.HadoopRDD#convertSplitLocationInfo` might generate unexpected `Some(null)` elements where should be replace by `Option.apply` ### Why are the changes needed? fix NPE ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #36222 from yaooqinn/SPARK-38922. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit 33e07f3cd926105c6d28986eb6218f237505549e) Signed-off-by: Kent Yao <y...@apache.org> --- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/rdd/HadoopRDDSuite.scala | 30 ++++++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 5fc0b4f736d..ec9ab9c0663 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -452,7 +452,7 @@ private[spark] object HadoopRDD extends Logging { infos: Array[SplitLocationInfo]): Option[Seq[String]] = { Option(infos).map(_.flatMap { loc => val locationStr = loc.getLocation - if (locationStr != "localhost") { + if (locationStr != null && locationStr != "localhost") { if (loc.isInMemory) { logDebug(s"Partition $locationStr is cached by Hadoop.") Some(HDFSCacheTaskLocation(locationStr).toString) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bc69f4c804e..759fd20ff2e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2377,7 +2377,7 @@ private[spark] class DAGScheduler( // If the RDD has some placement preferences (as is the case for input RDDs), get those val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { - return rddPrefs.map(TaskLocation(_)) + return rddPrefs.filter(_ != null).map(TaskLocation(_)) } // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency diff --git a/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala new file mode 100644 index 00000000000..b43d76c114c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/HadoopRDDSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import org.apache.hadoop.mapred.SplitLocationInfo + +import org.apache.spark.SparkFunSuite + +class HadoopRDDSuite extends SparkFunSuite { + + test("SPARK-38922: HadoopRDD convertSplitLocationInfo contains Some(null) cause NPE") { + val locs = Array(new SplitLocationInfo(null, false)) + assert(HadoopRDD.convertSplitLocationInfo(locs).get.isEmpty) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org