[GitHub] spark pull request #23242: [SPARK-26285][CORE] accumulator metrics sources f...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23242#discussion_r239994508 --- Diff: examples/src/main/scala/org/apache/spark/examples/AccumulatorMetricsTest.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import org.apache.spark.metrics.source.{DoubleAccumulatorSource, LongAccumulatorSource} +import org.apache.spark.sql.SparkSession + +/** + * Usage: AccumulatorMetricsTest [partitions] [numElem] [blockSize] + */ +object AccumulatorMetricsTest { --- End diff -- Example named as Test is a bit confusing i think... thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237652232 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) --- End diff -- In the old version, we generated a random path with encryption turned off, so with encryption off it reads and writes from random path. When encryption related code was written we introduced a new "broadcast" path, the problem is when we tried to decrypt it on the driver side, it looks at the random path reference lying around and tries to decrypt from it but the actual data is in the new "broadcast" path location. So, by just passing the random reference, we make sure all the places are in sync with and without encryption --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237250388 --- Diff: python/pyspark/broadcast.py --- @@ -134,7 +137,15 @@ def value(self): """ Return the broadcasted value """ if not hasattr(self, "_value") and self._path is not None: -self._value = self.load_from_path(self._path) +# we only need to decrypt it here when encryption is enabled and --- End diff -- sorry yeah i thought hasattr would take of that my bad earlier had the check self._sc is not None, will add it back in load_from_path --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/23166#discussion_r237231750 --- Diff: python/pyspark/broadcast.py --- @@ -118,8 +121,16 @@ def dump(self, value, f): f.close() def load_from_path(self, path): -with open(path, 'rb', 1 << 20) as f: -return self.load(f) +# we only need to decrypt it here if its on the driver since executor +# decryption handled already +if self._sc is not None and self._sc._encryption_enabled: --- End diff -- makes sense, will move it there --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23166: [SPARK-26201] Fix python broadcast with encryption
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/23166 @squito yeah looking at the code we could maintain a soft reference so that we dont have to keep requesting it from the disk. It is definitely an optimization --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23166: [SPARK-26201] Fix python broadcast with encryptio...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/23166 [SPARK-26201] Fix python broadcast with encryption ## What changes were proposed in this pull request? Python with rpc and disk encryption enabled along with a python broadcast variable and just read the value back on the driver side the job failed with: Traceback (most recent call last): File "broadcast.py", line 37, in words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input To reproduce use configs: --conf spark.network.crypto.enabled=true --conf spark.io.encryption.enabled=true Code: words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) words_new.value print(words_new.value) (Please fill in changes proposed in this fix) ## How was this patch tested? words_new = sc.broadcast([âscalaâ, âjavaâ, âhadoopâ, âsparkâ, âakkaâ]) textFile = sc.textFile(âREADME.mdâ) wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word + words_new.value[1], 1)).reduceByKey(lambda a, b: a+b) count = wordCounts.count() print(count) words_new.value print(words_new.value) (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-26201 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23166.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23166 commit 67a2ac87fb6e2d3fd4a5f260047a37bd2858228d Author: schintap Date: 2018-11-28T16:20:55Z [SPARK-26201] Fix python broadcast with encryption --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24355] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r227118510 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,54 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; --- End diff -- I haven't been able to reproduce this but the number of threads used for this tests are 2* number of cores or spark.shuffle.io.serverThreads. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21402: SPARK-24355 Spark external shuffle server improvement to...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/21402 @cloud-fan yes we can close this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22628: [SPARK-25641] Change the spark.shuffle.server.chunkFetch...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/22628 Thanks @tgravescs for explaining the issue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22628: [SPARK-25641] Change the spark.shuffle.server.chunkFetch...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/22628 @tgravescs @abellina plz take a look thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22628: Change the spark.shuffle.server.chunkFetchHandler...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/22628 Change the spark.shuffle.server.chunkFetchHandlerThreadsPercent default to 100 ## What changes were proposed in this pull request? We want to change the default percentage to 100 for spark.shuffle.server.chunkFetchHandlerThreadsPercent. The reason being currently this is set to 0. Which means currently if server.ioThreads > 0, the default number of threads would be 2 * #cores instead of server.io.Threads. We want the default to server.io.Threads in case this is not set at all. Also here a default of 0 would also mean 2 * #cores ## How was this patch tested? Manual (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-25641 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22628.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22628 commit 0258d197e27e6a053023d1b49955343659e240ac Author: Sanket Chintapalli Date: 2018-10-04T14:34:29Z Change the spark.shuffle.server.chunkFetchHandlerThreadsPercent default to 100 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22173: [SPARK-24355] Spark external shuffle server improvement ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/22173 closes #21402 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22173: [SPARK-24355] Spark external shuffle server improvement ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/22173 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218942035 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,54 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; + public TransportContext(TransportConf conf, RpcHandler rpcHandler) { -this(conf, rpcHandler, false); +this(conf, rpcHandler, false, false); } public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections) { +this(conf, rpcHandler, closeIdleConnections, false); + } + +/** + * + * @param conf TransportConf + * @param rpcHandler RpcHandler responsible for handling requests and responses. + * @param closeIdleConnections Close idle connections if it is set to true. + * @param isClientOnly This config is more important when external shuffle is enabled. + * It stops creating extra event loop and subsequent thread pool + * for shuffle clients to handle chunked fetch requests. + * In the case when external shuffle is disabled, the executors are both + * client and server so both share the same event loop which is trivial. --- End diff -- I hope we follow a similar indentation for all other comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218939333 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -98,21 +98,32 @@ public TransportContext( this(conf, rpcHandler, closeIdleConnections, false); } +/** + * + * @param conf TransportConf + * @param rpcHandler RpcHandler responsible for handling requests and responses. + * @param closeIdleConnections Close idle connections if is set to true. + * @param isClientOnly This config is more important when external shuffle is enabled. --- End diff -- I think for comments we follow the same spacing convention as observed here so sticking with it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218930135 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,43 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; + public TransportContext(TransportConf conf, RpcHandler rpcHandler) { -this(conf, rpcHandler, false); +this(conf, rpcHandler, false, false); } public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections) { +this(conf, rpcHandler, closeIdleConnections, false); + } + + public TransportContext( + TransportConf conf, + RpcHandler rpcHandler, + boolean closeIdleConnections, + boolean isClient) { --- End diff -- sure... anything to make is more clear --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218621553 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunk
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218590278 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunk
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218559203 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if (!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); --- End diff -- Yes it is documented above... if it is 0 or 100 it is 2*#cores or io.serverThreads --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r218163618 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunk
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216074852 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { +this.client = client; +this.streamManager = streamManager; +this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), +cause); +ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { +Channel channel = ctx.channel(); +if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); +} +long chunksBeingTransferred = streamManager.chunksBeingTransferred(); +if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; +} +ManagedBuffer buf; +try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunk
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216069578 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + */ + public int chunkFetchHandlerThreads() { +if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; +} +int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); +return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: --- End diff -- I think it is a good idea to document both as this is an important config. Let me know your thoughts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r216068689 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java --- @@ -281,4 +282,31 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads --- End diff -- No based on how many threads required for other rpc calls, i have not tested them, but the whole point would be to reduce the dependency how much time the chunkFetchedRequests will be spending doing disk I/O --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22173: [SPARK-24335] Spark external shuffle server improvement ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/22173 thanks @vanzin, also @tgravescs gentle ping... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r212639516 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -144,14 +161,17 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); + ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. -.addLast("handler", channelHandler); +.addLast("handler", channelHandler) +// Use a separate EventLoopGroup to handle ChunkFetchRequest messages. +.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); --- End diff -- yes i did notice that... makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22173: [SPARK-24335] Spark external shuffle server improvement ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/22173 @tgravescs @vanzin @Victsm please chime in thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24335] Spark external shuffle server impro...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/22173 [SPARK-24335] Spark external shuffle server improvement to better handle block fetch requests. ## What changes were proposed in this pull request? This is a continuation PR from https://github.com/apache/spark/pull/21402 Since there is no activity, I am willing to take this over and made few minor changes and tested them. Adding the description from the earlier PR Description: Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads. In order to process a client request, it would require one available server netty handler thread. However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients. As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process. This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server. (Please fill in changes proposed in this fix) ## How was this patch tested? Unit tests and stress testing. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-24335 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22173.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22173 commit 44bb55759a4059d8bb0e60c361a8a3210a234f92 Author: Sanket Chintapalli Date: 2018-08-21T17:34:31Z SPARK-24355 Spark external shuffle server improvement to better handle block fetch requests. commit 3bab74ca84fe1b6682000741b958c8792f792472 Author: Sanket Chintapalli Date: 2018-08-21T16:49:50Z make chunk fetch handler threads as a percentage of transport server threads --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21402: SPARK-24355 Spark external shuffle server improvement to...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/21402 @Victsm @vanzin i want to get this going, is it better if I have a PR up with the requested changes and concerns? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21636: Spark 24533
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/21636 Spark 24533 ## What changes were proposed in this pull request? Typesafe has rebranded to lightbend. Just changing the downloads path to avoid redirection ## How was this patch tested? Tested by running build/mvn -DskipTests package Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-24533 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21636.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21636 commit f08f74a3a774f9e2768f7924c4438516a4106b7c Author: Sanket Chintapalli Date: 2018-05-31T22:16:39Z Fix configuration specification for killBlacklisted executors commit a103673073e3114b9a28cf183a8d7ec9271769e3 Author: Sanket Chintapalli Date: 2018-06-11T18:34:06Z change description commit 1a654b4cfce4d0d8faa3ba6f14a3ba11dcc8ccd1 Author: Sanket Chintapalli Date: 2018-06-12T14:32:02Z wrap text commit 906e0654d5577a36c9db8de4118b749299e38c53 Author: Sanket Chintapalli Date: 2018-06-25T14:14:28Z Merge branch 'master' of github.com:apache/spark commit ad453b2b510fba71b3e883c185a082ac7bfce69b Author: Sanket Chintapalli Date: 2018-06-25T14:15:51Z move typesafe to lightbend --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21475: [SPARK-24416] Fix configuration specification for...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/21475#discussion_r194760044 --- Diff: docs/configuration.md --- @@ -1629,8 +1629,10 @@ Apart from these, the following properties are also available, and may be useful spark.blacklist.killBlacklistedExecutors false -(Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, -executors when they are blacklisted. Note that, when an entire node is added to the blacklist, +(Experimental) If set to "true", allow Spark to automatically kill the +executors when they are blacklisted on fetch failure or blacklisted for the entire application, +as controlled by spark.blacklist.application.*. +Note that, when an entire node is added to the blacklist, --- End diff -- ok, i checked the README.md and it did not spoil the indentation. No issues :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21475: [SPARK-24416] Fix configuration specification for killBl...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/21475 @tgravescs @squito fixed the description hope you can take a look thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21475: [SPARK-24416] Fix configuration specification for killBl...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/21475 That description might work too @squito If that is ok with @tgravescs I can rephrase it... Just wanted it to be explicit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21468: [SPARK-22151] : PYTHONPATH not picked up from the spark....
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/21468 LGTM @pgandhi999 Hope @tgravescs can confirm it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21476: [SPARK-24446][yarn] Properly quote library path f...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/21476#discussion_r192804713 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1485,6 +1486,22 @@ private object Client extends Logging { YarnAppReport(report.getYarnApplicationState(), report.getFinalApplicationStatus(), diagsOpt) } + /** + * Create a properly quoted library path string to be added as a prefix to the command executed by + * YARN. This is different from plain quoting due to YARN executing the command through "bash -c". + */ + def createLibraryPathPrefix(libpath: String, conf: SparkConf): String = { +val cmdPrefix = if (Utils.isWindows) { + Utils.libraryPathEnvPrefix(Seq(libpath)) +} else { + val envName = Utils.libraryPathEnvName + // For quotes, escape both the quote and the escape character when encoding in the command + // string. + val quoted = libpath.replace("\"", "\\\"") --- End diff -- Dumb question i think escaping "\"" => "\\\"". Not sure why we have so many escapes otherwise. Trying to understand, else PR looks good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21475: [SPARK-24416] Fix configuration specification for killBl...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/21475 @tgravescs plz review thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21475: [SPARK-24416] Fix configuration specification for...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/21475 [SPARK-24416] Fix configuration specification for killBlacklisted executors ## What changes were proposed in this pull request? spark.blacklist.killBlacklistedExecutors is defined as (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, executors when they are blacklisted. Note that, when an entire node is added to the blacklist, all of the executors on that node will be killed. I presume the killing of blacklisted executors only happens after the stage completes successfully and all tasks have completed or on fetch failures (updateBlacklistForFetchFailure/updateBlacklistForSuccessfulTaskSet). It is confusing because the definition states that the executor will be attempted to be recreated as soon as it is blacklisted. This is not true while the stage is in progress and an executor is blacklisted, it will not attempt to cleanup until the stage finishes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-24416 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21475.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21475 commit f08f74a3a774f9e2768f7924c4438516a4106b7c Author: Sanket Chintapalli Date: 2018-05-31T22:16:39Z Fix configuration specification for killBlacklisted executors --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19103: [SPARK-21890] Credentials not being passed to add...
Github user redsanket closed the pull request at: https://github.com/apache/spark/pull/19103 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19140: [SPARK-21890] Credentials not being passed to add the to...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/19140 Added principal check back and tested in secure hadoop env. Let me know if this looks fine with you @jerryshao @vanzin @tgravescs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19140: [SPARK-21890] Credentials not being passed to add the to...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/19140 @jerryshao yes will do no issues thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19140: [SPARK-21890] Credentials not being passed to add the to...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/19140 Previous discussion on this PR is here https://github.com/apache/spark/pull/19103 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19140: [SPARK-21890] Credentials not being passed to add...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/19140#discussion_r137096611 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala --- @@ -103,15 +103,17 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def getTokenRenewalInterval( hadoopConf: Configuration, - filesystems: Set[FileSystem]): Option[Long] = { + filesystems: Set[FileSystem], + creds:Credentials): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. -val creds = fetchDelegationTokens( +val fetchCreds = fetchDelegationTokens( --- End diff -- Also here the diff in spark2.2 and master => is missing PRINCPAL(aka spark.yarn.principal) config. Not sure if we need to do this now. Let me know your opinion @vanzin @tgravescs sparkConf.get(PRINCIPAL).flatMap { renewer => val creds = new Credentials() hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst => val dstFs = dst.getFileSystem(hadoopConf) dstFs.addDelegationTokens(renewer, creds) } --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19103: [SPARK-21890] Credentials not being passed to add the to...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/19103 @vanzin @tgravescs sorry for the delay, will put up a PR against master, we can move further discussion there, about the suggested improvements, I put up a PR against master just for workaround. https://github.com/apache/spark/pull/19140 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19140: [SPARK-21890] Credentials not being passed to add...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/19140 [SPARK-21890] Credentials not being passed to add the tokens I observed this while running a oozie job trying to connect to hbase via spark. It look like the creds are not being passed in thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53 for 2.2 release. More Info as to why it fails on secure grid: Oozie client gets the necessary tokens the application needs before launching. It passes those tokens along to the oozie launcher job (MR job) which will then actually call the Spark client to launch the spark app and pass the tokens along. The oozie launcher job cannot get anymore tokens because all it has is tokens ( you can't get tokens with tokens, you need tgt or keytab). The error here is because the launcher job runs the Spark Client to submit the spark job but the spark client doesn't see that it already has the hdfs tokens so it tries to get more, which ends with the exception. There was a change with SPARK-19021 to generalize the hdfs credentials provider that changed it so we don't pass the existing credentials into the call to get tokens so it doesn't realize it already has the necessary tokens. https://issues.apache.org/jira/browse/SPARK-21890 Modified to pass creds to get delegation tokens You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-21890-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19140 commit ebbcb7f887557e09eacc56dbd7bb88da445b8fa2 Author: Sanket Chintapalli <schin...@yahoo-inc.com> Date: 2017-09-01T20:32:20Z Credentials not being passed to gather the tokens commit 0cfca504e3ee30c1cb62ae5976a7784292418f45 Author: Sanket Chintapalli <schin...@yahoo-inc.com> Date: 2017-09-05T19:43:50Z Feel fetchCreds is appropriate naming convention --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19103: [SPARK-21890]
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/19103 [SPARK-21890] ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-21890 Modified to pass creds to get delegation tokens ## How was this patch tested? Manual testing on our secure cluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-21890 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19103 commit 7043d98ccb7f5cfc4e854f609afa3c380d274c36 Author: Sanket Chintapalli <schin...@yahoo-inc.com> Date: 2017-08-31T21:20:34Z Fix creds not being passed issue to add delegation tokens for a filesystem --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18940: [SPARK-21501] Change CacheLoader to limit entries based ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/18940 @vanzin addressed the config comment thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18940: [SPARK-21501] Change CacheLoader to limit entries based ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/18940 Thanks @vanzin @kiszk will do, makes sense to me now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18940: [SPARK-21501] Change CacheLoader to limit entries based ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/18940 @kiszk wouldn't the updated release notes/docs take care of that, which configs can no longer be used and which are not. I don't mind adding a warning msg saying please use another cache.size instead of cache.entries or providing two alternate implementations based on entries/size. I would like to see what other PMC's think about this @tgravescs @vanzin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18940: [SPARK-21501] Change CacheLoader to limit entries based ...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/18940 @kiszk I dont think that would be ideal, it is better to backport the feature itself to a desired version or branch, having two conflicting configs for the same task is not ideal, if that is what you mean, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18940: SPARK-21501 Change CacheLoader to limit entries based on...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/18940 @dbolshak there were no unit tests for google cache implementation here before, I could add a simple test to check for cache behavior if it is necessary but ideally a scale test is necessary to understand the shuffleCacheIndex behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18940: SPARK-21501 Change CacheLoader to limit entries b...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/18940#discussion_r133220047 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java --- @@ -104,15 +105,22 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF Executor directoryCleaner) throws IOException { this.conf = conf; this.registeredExecutorFile = registeredExecutorFile; -int indexCacheEntries = conf.getInt("spark.shuffle.service.index.cache.entries", 1024); +String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m"); CacheLoader<File, ShuffleIndexInformation> indexCacheLoader = new CacheLoader<File, ShuffleIndexInformation>() { public ShuffleIndexInformation load(File file) throws IOException { return new ShuffleIndexInformation(file); } }; -shuffleIndexCache = CacheBuilder.newBuilder() - .maximumSize(indexCacheEntries).build(indexCacheLoader); +shuffleIndexCache = +CacheBuilder.newBuilder() + .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)) --- End diff -- yeah the prev code actually made me to follow the convention, ok will revert to 2 space indentation thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18940: YSPARK-734 Change CacheLoader to limit entries ba...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/18940 YSPARK-734 Change CacheLoader to limit entries based on memory footprint Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers. We saw an issues with a job that had 17 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself. We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB. https://issues.apache.org/jira/browse/SPARK-21501 Manual Testing with 17 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-21501 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18940.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18940 commit f23a4c79b69fd1f8a77162da34b8821cb0cc1352 Author: Sanket Chintapalli <schin...@yahoo-inc.com> Date: 2017-07-27T14:59:40Z YSPARK-734 Change CacheLoader to limit entries based on memory footprint --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 @tgravescs @vanzin ready for merge? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r115097978 --- Diff: core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json --- @@ -22,6 +23,7 @@ "duration" : 101795, "sparkUser" : "jose", "completed" : true, +"appSparkVersion" : "", --- End diff -- aah ok the UI will work because it is replaying but the rest end point would break as it is not allowing it to pass through, thanks @vanzin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r115097526 --- Diff: core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json --- @@ -22,6 +23,7 @@ "duration" : 101795, "sparkUser" : "jose", "completed" : true, +"appSparkVersion" : "", --- End diff -- ok doPost is posting the events 7/05/05 17:17:17.265 qtp927704210-1183 INFO ReplayListenerBus: eventInDoPost SparkListenerLogStart(1.4.0-SNAPSHOT) So it should be good, so looks like ApplicationEventListener is not able to read the events, it used to before something has changed will dig deeper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r115096536 --- Diff: core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json --- @@ -22,6 +23,7 @@ "duration" : 101795, "sparkUser" : "jose", "completed" : true, +"appSparkVersion" : "", --- End diff -- I see events being passed to ApplicationEventListener in debug logs but interestingly I doPostEvent seems to be not posting events to the listener to listen to the event which is a bit odd, not sure I need to add a change in contract for the tests to pick this up though 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.jobs.JobProgressListener@53e58df ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.env.EnvironmentListener@53371e87 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.storage.StorageStatusListener@394665e7 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.exec.ExecutorsListener@2d92f38e ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.storage.StorageListener@5a9dbe43 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:42.235 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.scope.RDDOperationGraphListener@31b550b1 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:42.236 qtp1618269752-1127 INFO ReplayListenerBus: listener --- event org.apache.spark.scheduler.ApplicationEventListener@3b56723d ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.jobs.JobProgressListener@3647a865 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.env.EnvironmentListener@4357e8cc ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.storage.StorageStatusListener@5062f4c0 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.exec.ExecutorsListener@273c097f ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.storage.StorageListener@2c33997d ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.scope.RDDOperationGraphListener@505e15bf ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:43.300 qtp1618269752-1126 INFO ReplayListenerBus: listener --- event org.apache.spark.scheduler.ApplicationEventListener@175d3f80 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.jobs.JobProgressListener@71dacceb ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.env.EnvironmentListener@6f41235c ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.storage.StorageStatusListener@3cdcb6d0 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.exec.ExecutorsListener@4abcd5c6 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.storage.StorageListener@6d26f5ee ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.scope.RDDOperationGraphListener@b1b60d5 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.305 qtp1618269752-1131 INFO ReplayListenerBus: listener --- event org.apache.spark.scheduler.ApplicationEventListener@52283d50 ---SparkListenerLogStart(2.3.0-SNAPSHOT) 17/05/05 16:39:44.676 qtp1478318899-1195 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.jobs.JobProgressListener@f1563e7 ---SparkListenerLogStart(1.4.0-SNAPSHOT) 17/05/05 16:39:44.676 qtp1478318899-1195 INFO ReplayListenerBus: listener --- event org.apache.spark.ui.env.EnvironmentListener@91
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r114924015 --- Diff: core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json --- @@ -22,6 +23,7 @@ "duration" : 101795, "sparkUser" : "jose", "completed" : true, +"appSparkVersion" : "", --- End diff -- probably I could change the default value, looks like ok will do it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r114921697 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -283,10 +283,15 @@ private[spark] object EventLoggingListener extends Logging { * * @param logStream Raw output stream to the event log file. */ - def initEventLog(logStream: OutputStream): Unit = { + def initEventLog(logStream: OutputStream, testing: Boolean, + loggedEvents: ArrayBuffer[JValue]): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) -val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" +val eventJson = JsonProtocol.logStartToJson(metadata) +val metadataJson = compact(eventJson) + "\n" logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) +if (testing && loggedEvents != null) { + loggedEvents += eventJson --- End diff -- I thought the loggedEvents only takes json value. Also the loggedEvents are generated here as a part of spark context and probably through other sources. The ReplayListenerSuite however tests the original events with the replay events (here the replay events are written to the event log but however the loggerEvents will not have the SparkListenerLogStart event as this is not a part of SparkContext if I understand it correctly). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r114921013 --- Diff: core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json --- @@ -22,6 +23,7 @@ "duration" : 101795, "sparkUser" : "jose", "completed" : true, +"appSparkVersion" : "", --- End diff -- I am not sure the if the tests hit this code path https://github.com/apache/spark/pull/17658/files#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR474, so they take the default value --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 I think I should set up my IDE would be nice to have something like checkstyle.xml instead of configuring these things, sorry abt that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 SparkContext was not able to read SparkListenerLogStart event as it is not a part of it and the subsequent replay listener suite tries to compare eventLogs and original events emitted via SparkContext --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r114647969 --- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala --- @@ -60,6 +60,8 @@ private[spark] class SparkUI private ( var appId: String = _ + var appSparkVersion = "" --- End diff -- Yeah will fix that thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 The issue before was that I was replaying the events before adding the listeners in the getSparkUI. That caused a bunch of tests to fail and other files were to do with the appSparkVersion not being set in the expection_json to "" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 ok looks like I did not modify the expectation.json files will fix them --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 ok will add it to either RuntimeInfo or ApplicationInfo. I thought it might break the contract underneath but if it doesn't then I should add it here thanks for the input. Will do --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 @vanzin Can I add this to SparkConf.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L58 just to have the application info here it will have the info on the API without modifying the intended interface as I see modifying ApplicationInfo API will have consequences on wherever it is used and might be a bigger change? Also adding it to RuntimeInfo will be awkward like you said, having it is conf would be nice place where it can live without making major changes. Let me know your opinion, I can make changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r113299115 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala --- @@ -57,4 +58,10 @@ private[spark] class ApplicationEventListener extends SparkListener { adminAclsGroups = allProperties.get("spark.admin.acls.groups") } } + + override def onOtherEvent(event:SparkListenerEvent):Unit = event match { --- End diff -- oh ok I thought I had space before the style check complained about it but might have interpreted it wrongly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r113298757 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala --- @@ -57,4 +58,10 @@ private[spark] class ApplicationEventListener extends SparkListener { adminAclsGroups = allProperties.get("spark.admin.acls.groups") } } + + override def onOtherEvent(event:SparkListenerEvent):Unit = event match { --- End diff -- Scala style check failed Scalastyle checks failed for this in the above run at following occurrences: [error] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala:62:33: No space after token : [error] (core/compile:scalastyle) errors exist [error] Total time: 11 s, completed Apr 17, 2017 8:39:08 AM [error] running /home/jenkins/workspace/SparkPullRequestBuilder/dev/lint-scala ; received return code 1 Attempting to post to Github... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r112726055 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,7 +160,6 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent /** * An internal class that describes the metadata of an event log. - * This event is not meant to be posted to listeners downstream. --- End diff -- This was only for metadata info, so when this was written it was just not meant to be consumed but now we can reuse it for this case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17658: [SPARK-20355] Add per application spark version on the h...
Github user redsanket commented on the issue: https://github.com/apache/spark/pull/17658 @vanzin sure will address the concerns thanks for the review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r112268780 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala --- @@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus listener.onNodeUnblacklisted(nodeUnblacklisted) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) - case logStart: SparkListenerLogStart => // ignore event log metadata --- End diff -- I am not too sure if it will change any behavior and precisely why we post it to other events, in case someone wants to listen to them and utilize the event like in this scenario --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/17658#discussion_r112067747 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala --- @@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus listener.onNodeUnblacklisted(nodeUnblacklisted) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) - case logStart: SparkListenerLogStart => // ignore event log metadata --- End diff -- I do not see it getting consumed apart from registering some metadata, so I guess it should be fine as this event already logs the version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17658: [SPARK-20355] Add per application spark version o...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/17658 [SPARK-20355] Add per application spark version on the history server headerpage ## What changes were proposed in this pull request? Spark Version for a specific application is not displayed on the history page now. It should be nice to switch the spark version on the UI when we click on the specific application. Currently there seems to be way as SparkListenerLogStart records the application version. So, it should be trivial to listen to this event and provision this change on the UI. For Example https://cloud.githubusercontent.com/assets/8295799/25092588/fd53325e-2353-11e7-9ac7-ba304f81ba1a.png;> https://cloud.githubusercontent.com/assets/8295799/25092595/0549aace-2354-11e7-80a7-e044da2d5e0f.png;> {"Event":"SparkListenerLogStart","Spark Version":"2.0.0"} (Please fill in changes proposed in this fix) Modified the SparkUI for History server to listen to SparkLogListenerStart event and extract the version and print it. ## How was this patch tested? Manual testing of UI page. Attaching the UI screenshot changes here (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-20355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17658.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17658 commit 1f50b2750714bfcb2c77b9932ed7c5fca3d7cfa3 Author: Sanket <schintap@untilservice-lm> Date: 2017-04-06T13:50:22Z Add per application spark version on the history server headerpage --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12152: [SPARK-14279] Spark Version will be picked from p...
Github user redsanket closed the pull request at: https://github.com/apache/spark/pull/12152 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14279] Spark Version will be picked fro...
Github user redsanket commented on the pull request: https://github.com/apache/spark/pull/12152#issuecomment-205611695 @vanzin The idea was to make it pluggable to be used by other projects if neccessary. If we do not want to have additional information regarding the build it can be simplified but I think it would be nice to have this information. I will look into antrun plugin thank you --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14279] Spark Version will be picked fro...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/12152 [SPARK-14279] Spark Version will be picked from pom.version ## What changes were proposed in this pull request? This PR proposes to pick the version information for SPARK from pom.version. It also proposes to include other relevant build details. ## How was this patch tested? spark-submit --version should give the version information and also build details You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark SPARK-14279 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/12152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #12152 commit 209640029c3d18329dbd9a8a9fdcaec9b3babc8c Author: Sanket <schintap@untilservice-lm> Date: 2016-04-04T15:26:00Z Spark Version will be picked from pom.version and other relavent info related to build and compilation will be displayed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r53812741 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -328,7 +345,9 @@ final class ShuffleBlockFetcherIterator( private def fetchUpToMaxBytes(): Unit = { // Send fetch requests up to maxBytesInFlight while (fetchRequests.nonEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { + (bytesInFlight == 0 || +(reqsInFlight + 1 <= maxReqsInFlight && --- End diff -- @zsxwing My account name is sanket991 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on the pull request: https://github.com/apache/spark/pull/10838#issuecomment-182911828 @zsxwing rebased and changed ArrayBuffer to HashSet @tgravescs might want to take a look at it one more time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on the pull request: https://github.com/apache/spark/pull/10838#issuecomment-183063914 @zsxwing addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r52518306 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -143,9 +148,11 @@ final class ShuffleBlockFetcherIterator( logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight += req.size +reqsInFlight += 1 // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap +val remainingBlocks = new ArrayBuffer[String]() ++ sizeMap.keys --- End diff -- @zsxwing Just curious both ArrayBuffer and HashSet seem to be thread safe I presume from looking at the scala API and blockId seems to be unique. Just curious why we need to change it else I don't mind changing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r52564828 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -143,9 +148,11 @@ final class ShuffleBlockFetcherIterator( logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight += req.size +reqsInFlight += 1 // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap +val remainingBlocks = new ArrayBuffer[String]() ++ sizeMap.keys --- End diff -- yup --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r52542439 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -143,9 +148,11 @@ final class ShuffleBlockFetcherIterator( logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight += req.size +reqsInFlight += 1 // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap +val remainingBlocks = new ArrayBuffer[String]() ++ sizeMap.keys --- End diff -- ok thanks np that was an obvious question mostly EOD tired query. Sorry!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r52541897 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -143,9 +148,11 @@ final class ShuffleBlockFetcherIterator( logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight += req.size +reqsInFlight += 1 // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap +val remainingBlocks = new ArrayBuffer[String]() ++ sizeMap.keys --- End diff -- But for ArrayBuffer prepends and deletes are linear --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of in flight outboun...
Github user redsanket commented on the pull request: https://github.com/apache/spark/pull/10838#issuecomment-178209754 @zsxwing Updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of concurrent outbou...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r50576595 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -258,6 +268,9 @@ final class ShuffleBlockFetcherIterator( val remoteRequests = splitLocalRemoteBlocks() // Add the remote requests into our queue in a random order fetchRequests ++= Utils.randomize(remoteRequests) +assert ((0 == reqsInFlight) == (0 == bytesInFlight), + "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight + + ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight) --- End diff -- @holdenk Improved the assert information, can take look at it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of concurrent outbou...
GitHub user redsanket opened a pull request: https://github.com/apache/spark/pull/10838 [SPARK-6166] Limit number of concurrent outbound connections This JIRA is related to https://github.com/apache/spark/pull/5852 Had to do some minor rework and test to make sure it works with current version of spark. You can merge this pull request into a Git repository by running: $ git pull https://github.com/redsanket/spark limit-outbound-connections Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/10838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #10838 commit 32063006c28e4e94c6005e559e03465a1ce41e81 Author: Sanket <schintap@untilservice-lm> Date: 2016-01-19T21:38:51Z Limit number of concurrent outbound connections commit 4b2bbd83f4fe02375f7ccfd73e091e000b3aae7b Author: Sanket <schintap@untilservice-lm> Date: 2016-01-19T21:46:12Z merge resolution from upstream:master commit 9761809f5129fd4a5f593a4904f9b086f46c9f76 Author: Sanket <schintap@untilservice-lm> Date: 2016-01-19T21:48:41Z Changed info level to debug level --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of concurrent outbou...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r50208257 --- Diff: docs/configuration.md --- @@ -392,6 +392,17 @@ Apart from these, the following properties are also available, and may be useful + spark.reducer.maxReqsInFlight + 20 + +spark.reducer.maxMbInFlight puts a bound on the in flight data in terms of size. --- End diff -- The full description describes about the configuration if you can take a look at it. The first line just explains why the maxMbInFlight is insufficient --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6166] Limit number of concurrent outbou...
Github user redsanket commented on a diff in the pull request: https://github.com/apache/spark/pull/10838#discussion_r50208175 --- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala --- @@ -81,7 +81,7 @@ import org.apache.spark.util.Utils * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty *for the HttpServer. Jetty supports multiple authentication mechanisms - *Basic, Digest, Form, Spengo, etc. It also supports multiple different login - *services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService + *services - Hash, JAAS, Spengo, JDBC, etc. Spark currently uses the HashLoginService --- End diff -- The above line had Spengo, so I happened to see it and change it, I might have to change the other way round --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org