Repository: spark
Updated Branches:
  refs/heads/master ea59b0f3a -> 278281828


[SPARK-12350][CORE] Don't log errors when requested stream is not found.

If a client requests a non-existent stream, just send a failure message
back, without logging any error on the server side (since it's not a
server error).

On the executor side, avoid error logs by translating any errors during
transfer to a `ClassNotFoundException`, so that loading the class is
retried on a the parent class loader. This can mask IO errors during
transmission, but the most common cause is that the class is not
served by the remote end.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #10337 from vanzin/SPARK-12350.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27828182
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27828182
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27828182

Branch: refs/heads/master
Commit: 2782818287a71925523c1320291db6cb25221e9f
Parents: ea59b0f
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Dec 18 09:49:08 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Fri Dec 18 09:49:08 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    | 17 ++++++++--------
 .../spark/rpc/netty/NettyStreamManager.scala    |  7 +++++--
 .../spark/network/server/StreamManager.java     |  1 +
 .../network/server/TransportRequestHandler.java |  7 ++++++-
 .../apache/spark/repl/ExecutorClassLoader.scala | 21 ++++++++++++++++++--
 5 files changed, 39 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index de3db6b..975ea1a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv(
     }
 
     override def read(dst: ByteBuffer): Int = {
-      val result = if (error == null) {
-        Try(source.read(dst))
-      } else {
-        Failure(error)
-      }
-
-      result match {
+      Try(source.read(dst)) match {
         case Success(bytesRead) => bytesRead
-        case Failure(error) => throw error
+        case Failure(readErr) =>
+          if (error != null) {
+            throw error
+          } else {
+            throw readErr
+          }
       }
     }
 
@@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv(
     }
 
     override def onFailure(streamId: String, cause: Throwable): Unit = {
-      logError(s"Error downloading stream $streamId.", cause)
+      logDebug(s"Error downloading stream $streamId.", cause)
       source.setError(cause)
       sink.close()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 394cde4..afcb023 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
         new File(dir, fname)
     }
 
-    require(file != null && file.isFile(), s"File not found: $streamId")
-    new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+    if (file != null && file.isFile()) {
+      new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, 
file.length())
+    } else {
+      null
+    }
   }
 
   override def addFile(file: File): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
 
b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
index 3f01559..07f161a 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
@@ -54,6 +54,7 @@ public abstract class StreamManager {
    * {@link #getChunk(long, int)} method.
    *
    * @param streamId id of a stream that has been previously registered with 
the StreamManager.
+   * @return A managed buffer for the stream, or null if the stream was not 
found.
    */
   public ManagedBuffer openStream(String streamId) {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index c864d7c..105f538 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -141,7 +141,12 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
       return;
     }
 
-    respond(new StreamResponse(req.streamId, buf.size(), buf));
+    if (buf != null) {
+      respond(new StreamResponse(req.streamId, buf.size(), buf));
+    } else {
+      respond(new StreamFailure(req.streamId, String.format(
+        "Stream '%s' was not found.", req.streamId)));
+    }
   }
 
   private void processRpcRequest(final RpcRequest req) {

http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git 
a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala 
b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index da8f0aa..de7b831 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.repl
 
-import java.io.{IOException, ByteArrayOutputStream, InputStream}
+import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, 
IOException}
 import java.net.{HttpURLConnection, URI, URL, URLEncoder}
 import java.nio.channels.Channels
 
@@ -96,7 +96,24 @@ class ExecutorClassLoader(
 
   private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = 
{
     val channel = env.rpcEnv.openChannel(s"$classUri/$path")
-    Channels.newInputStream(channel)
+    new FilterInputStream(Channels.newInputStream(channel)) {
+
+      override def read(): Int = toClassNotFound(super.read())
+
+      override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b))
+
+      override def read(b: Array[Byte], offset: Int, len: Int) =
+        toClassNotFound(super.read(b, offset, len))
+
+      private def toClassNotFound(fn: => Int): Int = {
+        try {
+          fn
+        } catch {
+          case e: Exception =>
+            throw new ClassNotFoundException(path, e)
+        }
+      }
+    }
   }
 
   private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): 
InputStream = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to