Repository: spark
Updated Branches:
  refs/heads/branch-1.2 7b7db5942 -> f7fe87f4b


[SPARK-6209] Clean up connections in ExecutorClassLoader after failing to load 
classes (branch-1.2)

ExecutorClassLoader does not ensure proper cleanup of network connections that 
it opens. If it fails to load a class, it may leak partially-consumed 
InputStreams that are connected to the REPL's HTTP class server, causing that 
server to exhaust its thread pool, which can cause the entire job to hang.  See 
[SPARK-6209](https://issues.apache.org/jira/browse/SPARK-6209) for more 
details, including a bug reproduction.

This patch fixes this issue by ensuring proper cleanup of these resources.  It 
also adds logging for unexpected error cases.

(See #4944 for the corresponding PR for 1.3/1.4).

Author: Josh Rosen <joshro...@databricks.com>

Closes #5174 from JoshRosen/executorclassloaderleak-branch-1.2 and squashes the 
following commits:

16e38fe [Josh Rosen] [SPARK-6209] Clean up connections in ExecutorClassLoader 
after failing to load classes (master branch PR)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7fe87f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7fe87f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7fe87f4

Branch: refs/heads/branch-1.2
Commit: f7fe87f4b71f469f3c67a00883697ff09cc8a96a
Parents: 7b7db59
Author: Josh Rosen <joshro...@databricks.com>
Authored: Sun Apr 5 13:59:48 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Sun Apr 5 13:59:48 2015 -0700

----------------------------------------------------------------------
 repl/pom.xml                                    |  5 ++
 .../apache/spark/repl/ExecutorClassLoader.scala | 92 ++++++++++++++++----
 .../spark/repl/ExecutorClassLoaderSuite.scala   | 70 ++++++++++++++-
 3 files changed, 148 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7fe87f4/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index 7eb92d5..b91ae8b 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -96,6 +96,11 @@
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/f7fe87f4/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 5ee3250..439fd14 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.repl
 
-import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLEncoder}
-import java.util.concurrent.{Executors, ExecutorService}
+import java.io.{IOException, ByteArrayOutputStream, InputStream}
+import java.net.{HttpURLConnection, URI, URL, URLEncoder}
+
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 import org.apache.spark.util.ParentClassLoader
@@ -37,12 +38,15 @@ import 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
  * Allows the user to specify if user class path should be first
  */
 class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: 
ClassLoader,
-    userClassPathFirst: Boolean) extends ClassLoader {
+    userClassPathFirst: Boolean) extends ClassLoader with Logging {
   val uri = new URI(classUri)
   val directory = uri.getPath
 
   val parentLoader = new ParentClassLoader(parent)
 
+  // Allows HTTP connect and read timeouts to be controlled for testing / 
debugging purposes
+  private[repl] var httpUrlConnectionTimeoutMillis: Int = -1
+
   // Hadoop FileSystem object for our URI, if it isn't using HTTP
   var fileSystem: FileSystem = {
     if (uri.getScheme() == "http") {
@@ -71,27 +75,81 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
     }
   }
 
+  private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): 
InputStream = {
+    val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
+      val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
+      val newuri = Utils.constructURIForAuthentication(uri, 
SparkEnv.get.securityManager)
+      newuri.toURL
+    } else {
+      new URL(classUri + "/" + urlEncode(pathInDirectory))
+    }
+    val connection: HttpURLConnection = 
url.openConnection().asInstanceOf[HttpURLConnection]
+    // Set the connection timeouts (for testing purposes)
+    if (httpUrlConnectionTimeoutMillis != -1) {
+      connection.setConnectTimeout(httpUrlConnectionTimeoutMillis)
+      connection.setReadTimeout(httpUrlConnectionTimeoutMillis)
+    }
+    connection.connect()
+    try {
+      if (connection.getResponseCode != 200) {
+        // Close the error stream so that the connection is eligible for re-use
+        try {
+          connection.getErrorStream.close()
+        } catch {
+          case ioe: IOException =>
+            logError("Exception while closing error stream", ioe)
+        }
+        throw new ClassNotFoundException(s"Class file not found at URL $url")
+      } else {
+        connection.getInputStream
+      }
+    } catch {
+      case NonFatal(e) if !e.isInstanceOf[ClassNotFoundException] =>
+        connection.disconnect()
+        throw e
+    }
+  }
+
+  private def getClassFileInputStreamFromFileSystem(pathInDirectory: String): 
InputStream = {
+    val path = new Path(directory, pathInDirectory)
+    if (fileSystem.exists(path)) {
+      fileSystem.open(path)
+    } else {
+      throw new ClassNotFoundException(s"Class file not found at path $path")
+    }
+  }
+
   def findClassLocally(name: String): Option[Class[_]] = {
+    val pathInDirectory = name.replace('.', '/') + ".class"
+    var inputStream: InputStream = null
     try {
-      val pathInDirectory = name.replace('.', '/') + ".class"
-      val inputStream = {
+      inputStream = {
         if (fileSystem != null) {
-          fileSystem.open(new Path(directory, pathInDirectory))
+          getClassFileInputStreamFromFileSystem(pathInDirectory)
         } else {
-          if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
-            val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
-            val newuri = Utils.constructURIForAuthentication(uri, 
SparkEnv.get.securityManager)
-            newuri.toURL().openStream()
-          } else {
-            new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
-          }
+          getClassFileInputStreamFromHttpServer(pathInDirectory)
         }
       }
       val bytes = readAndTransformClass(name, inputStream)
-      inputStream.close()
       Some(defineClass(name, bytes, 0, bytes.length))
     } catch {
-      case e: Exception => None
+      case e: ClassNotFoundException =>
+        // We did not find the class
+        logDebug(s"Did not load class $name from REPL class server at $uri", e)
+        None
+      case e: Exception =>
+        // Something bad happened while checking if the class exists
+        logError(s"Failed to check existence of class $name on REPL class 
server at $uri", e)
+        None
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close()
+        } catch {
+          case e: Exception =>
+            logError("Exception while closing inputStream", e)
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f7fe87f4/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 6a79e76..c709cde 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -20,13 +20,25 @@ package org.apache.spark.repl
 import java.io.File
 import java.net.{URL, URLClassLoader}
 
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
+import org.scalatest.concurrent.Interruptor
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito._
 
-import org.apache.spark.{SparkConf, TestUtils}
+import org.apache.spark._
 import org.apache.spark.util.Utils
 
-class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
+class ExecutorClassLoaderSuite
+  extends FunSuite
+  with BeforeAndAfterAll
+  with MockitoSugar
+  with Logging {
 
   val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
   val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", 
"ReplFakeClass3")
@@ -34,6 +46,7 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
   var tempDir2: File = _
   var url1: String = _
   var urls2: Array[URL] = _
+  var classServer: HttpServer = _
 
   override def beforeAll() {
     super.beforeAll()
@@ -47,8 +60,12 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
 
   override def afterAll() {
     super.afterAll()
+    if (classServer != null) {
+      classServer.stop()
+    }
     Utils.deleteRecursively(tempDir1)
     Utils.deleteRecursively(tempDir2)
+    SparkEnv.set(null)
   }
 
   test("child first") {
@@ -83,4 +100,53 @@ class ExecutorClassLoaderSuite extends FunSuite with 
BeforeAndAfterAll {
     }
   }
 
+  test("failing to fetch classes from HTTP server should not leak resources 
(SPARK-6209)") {
+    // This is a regression test for SPARK-6209, a bug where each failed 
attempt to load a class
+    // from the driver's class server would leak a HTTP connection, causing 
the class server's
+    // thread / connection pool to be exhausted.
+    val conf = new SparkConf()
+    val securityManager = new SecurityManager(conf)
+    classServer = new HttpServer(conf, tempDir1, securityManager)
+    classServer.start()
+    // ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock 
this
+    val mockEnv = mock[SparkEnv]
+    when(mockEnv.securityManager).thenReturn(securityManager)
+    SparkEnv.set(mockEnv)
+    // Create an ExecutorClassLoader that's configured to load classes from 
the HTTP server
+    val parentLoader = new URLClassLoader(Array.empty, null)
+    val classLoader = new ExecutorClassLoader(conf, classServer.uri, 
parentLoader, false)
+    classLoader.httpUrlConnectionTimeoutMillis = 500
+    // Check that this class loader can actually load classes that exist
+    val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
+    val fakeClassVersion = fakeClass.toString
+    assert(fakeClassVersion === "1")
+    // Try to perform a full GC now, since GC during the test might mask 
resource leaks
+    System.gc()
+    // When the original bug occurs, the test thread becomes blocked in a 
classloading call
+    // and does not respond to interrupts.  Therefore, use a custom ScalaTest 
interruptor to
+    // shut down the HTTP server when the test times out
+    val interruptor: Interruptor = new Interruptor {
+      override def apply(thread: Thread): Unit = {
+        classServer.stop()
+        classServer = null
+        thread.interrupt()
+      }
+    }
+    def tryAndFailToLoadABunchOfClasses(): Unit = {
+      // The number of trials here should be much larger than Jetty's thread / 
connection limit
+      // in order to expose thread or connection leaks
+      for (i <- 1 to 1000) {
+        if (Thread.currentThread().isInterrupted) {
+          throw new InterruptedException()
+        }
+        // Incorporate the iteration number into the class name in order to 
avoid any response
+        // caching that might be added in the future
+        intercept[ClassNotFoundException] {
+          classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance()
+        }
+      }
+    }
+    failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to