Repository: spark Updated Branches: refs/heads/master ea59b0f3a -> 278281828
[SPARK-12350][CORE] Don't log errors when requested stream is not found. If a client requests a non-existent stream, just send a failure message back, without logging any error on the server side (since it's not a server error). On the executor side, avoid error logs by translating any errors during transfer to a `ClassNotFoundException`, so that loading the class is retried on a the parent class loader. This can mask IO errors during transmission, but the most common cause is that the class is not served by the remote end. Author: Marcelo Vanzin <van...@cloudera.com> Closes #10337 from vanzin/SPARK-12350. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27828182 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27828182 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27828182 Branch: refs/heads/master Commit: 2782818287a71925523c1320291db6cb25221e9f Parents: ea59b0f Author: Marcelo Vanzin <van...@cloudera.com> Authored: Fri Dec 18 09:49:08 2015 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Fri Dec 18 09:49:08 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 17 ++++++++-------- .../spark/rpc/netty/NettyStreamManager.scala | 7 +++++-- .../spark/network/server/StreamManager.java | 1 + .../network/server/TransportRequestHandler.java | 7 ++++++- .../apache/spark/repl/ExecutorClassLoader.scala | 21 ++++++++++++++++++-- 5 files changed, 39 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index de3db6b..975ea1a 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv( } override def read(dst: ByteBuffer): Int = { - val result = if (error == null) { - Try(source.read(dst)) - } else { - Failure(error) - } - - result match { + Try(source.read(dst)) match { case Success(bytesRead) => bytesRead - case Failure(error) => throw error + case Failure(readErr) => + if (error != null) { + throw error + } else { + throw readErr + } } } @@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv( } override def onFailure(streamId: String, cause: Throwable): Unit = { - logError(s"Error downloading stream $streamId.", cause) + logDebug(s"Error downloading stream $streamId.", cause) source.setError(cause) sink.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 394cde4..afcb023 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) new File(dir, fname) } - require(file != null && file.isFile(), s"File not found: $streamId") - new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) + if (file != null && file.isFile()) { + new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) + } else { + null + } } override def addFile(file: File): String = { http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java index 3f01559..07f161a 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -54,6 +54,7 @@ public abstract class StreamManager { * {@link #getChunk(long, int)} method. * * @param streamId id of a stream that has been previously registered with the StreamManager. + * @return A managed buffer for the stream, or null if the stream was not found. */ public ManagedBuffer openStream(String streamId) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---------------------------------------------------------------------- diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index c864d7c..105f538 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -141,7 +141,12 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { return; } - respond(new StreamResponse(req.streamId, buf.size(), buf)); + if (buf != null) { + respond(new StreamResponse(req.streamId, buf.size(), buf)); + } else { + respond(new StreamFailure(req.streamId, String.format( + "Stream '%s' was not found.", req.streamId))); + } } private void processRpcRequest(final RpcRequest req) { http://git-wip-us.apache.org/repos/asf/spark/blob/27828182/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index da8f0aa..de7b831 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -17,7 +17,7 @@ package org.apache.spark.repl -import java.io.{IOException, ByteArrayOutputStream, InputStream} +import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, IOException} import java.net.{HttpURLConnection, URI, URL, URLEncoder} import java.nio.channels.Channels @@ -96,7 +96,24 @@ class ExecutorClassLoader( private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = { val channel = env.rpcEnv.openChannel(s"$classUri/$path") - Channels.newInputStream(channel) + new FilterInputStream(Channels.newInputStream(channel)) { + + override def read(): Int = toClassNotFound(super.read()) + + override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b)) + + override def read(b: Array[Byte], offset: Int, len: Int) = + toClassNotFound(super.read(b, offset, len)) + + private def toClassNotFound(fn: => Int): Int = { + try { + fn + } catch { + case e: Exception => + throw new ClassNotFoundException(path, e) + } + } + } } private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org