Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ed545763a -> 88ba2e1d0


[SPARK-17650] malformed url's throw exceptions before bricking Executors

## What changes were proposed in this pull request?

When a malformed URL was sent to Executors through `sc.addJar` and 
`sc.addFile`, the executors become unusable, because they constantly throw 
`MalformedURLException`s and can never acknowledge that the file or jar is just 
bad input.

This PR tries to fix that problem by making sure MalformedURLs can never be 
submitted through `sc.addJar` and `sc.addFile`. Another solution would be to 
blacklist bad files and jars on Executors. Maybe fail the first time, and then 
ignore the second time (but print a warning message).

## How was this patch tested?

Unit tests in SparkContextSuite

Author: Burak Yavuz <brk...@gmail.com>

Closes #15224 from brkyvz/SPARK-17650.

(cherry picked from commit 59d87d24079bc633e63ce032f0a5ddd18a3b02cb)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88ba2e1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88ba2e1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88ba2e1d

Branch: refs/heads/branch-2.0
Commit: 88ba2e1d0492039ee2cb1caa16160ec24bea3992
Parents: ed54576
Author: Burak Yavuz <brk...@gmail.com>
Authored: Sun Sep 25 22:57:31 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sun Sep 25 22:57:42 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 16 +++++++-------
 .../scala/org/apache/spark/util/Utils.scala     | 20 ++++++++++++++++++
 .../org/apache/spark/SparkContextSuite.scala    | 22 ++++++++++++++++++++
 3 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88ba2e1d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ffd1227..e9f9d72 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.io._
 import java.lang.reflect.Constructor
-import java.net.URI
+import java.net.{MalformedURLException, URI}
 import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
@@ -36,10 +36,8 @@ import com.google.common.collect.MapMaker
 import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, 
DoubleWritable,
-  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
-import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat,
-  TextInputFormat}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, 
DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, 
Writable}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat, TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => 
NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
 import org.apache.mesos.MesosNativeLibrary
@@ -47,8 +45,7 @@ import org.apache.mesos.MesosNativeLibrary
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
-import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat,
-  WholeTextFileInputFormat}
+import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
@@ -1441,6 +1438,9 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
         throw new SparkException(s"Added file $hadoopPath is a directory and 
recursive is not " +
           "turned on.")
       }
+    } else {
+      // SPARK-17650: Make sure this is a valid URL before adding it to the 
list of dependencies
+      Utils.validateURL(uri)
     }
 
     val key = if (!isLocal && scheme == "file") {
@@ -1700,6 +1700,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
         key = env.rpcEnv.fileServer.addJar(new File(path))
       } else {
         val uri = new URI(path)
+        // SPARK-17650: Make sure this is a valid URL before adding it to the 
list of dependencies
+        Utils.validateURL(uri)
         key = uri.getScheme match {
           // A JAR file which exists only on the driver node
           case null | "file" =>

http://git-wip-us.apache.org/repos/asf/spark/blob/88ba2e1d/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a0ef30e..7764fdc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -698,6 +698,26 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Validate that a given URI is actually a valid URL as well.
+   * @param uri The URI to validate
+   */
+  @throws[MalformedURLException]("when the URI is an invalid URL")
+  def validateURL(uri: URI): Unit = {
+    Option(uri.getScheme).getOrElse("file") match {
+      case "http" | "https" | "ftp" =>
+        try {
+          uri.toURL
+        } catch {
+          case e: MalformedURLException =>
+            val ex = new MalformedURLException(s"URI (${uri.toString}) is not 
a valid URL.")
+            ex.initCause(e)
+            throw ex
+        }
+      case _ => // will not be turned into a URL anyway
+    }
+  }
+
+  /**
    * Get the path of a temporary directory.  Spark's local directories can be 
configured through
    * multiple settings, which are used with the following precedence:
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/88ba2e1d/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index f8d143d..c451c59 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io.File
+import java.net.MalformedURLException
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 
@@ -173,6 +174,27 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("SPARK-17650: malformed url's throw exceptions before bricking 
Executors") {
+    try {
+      sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+      Seq("http", "https", "ftp").foreach { scheme =>
+        val badURL = s"$scheme://user:pwd/path"
+        val e1 = intercept[MalformedURLException] {
+          sc.addFile(badURL)
+        }
+        assert(e1.getMessage.contains(badURL))
+        val e2 = intercept[MalformedURLException] {
+          sc.addJar(badURL)
+        }
+        assert(e2.getMessage.contains(badURL))
+        assert(sc.addedFiles.isEmpty)
+        assert(sc.addedJars.isEmpty)
+      }
+    } finally {
+      sc.stop()
+    }
+  }
+
   test("addFile recursive works") {
     val pluto = Utils.createTempDir()
     val neptune = Utils.createTempDir(pluto.getAbsolutePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to