[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071669#comment-14071669 ] Ken Carlile commented on SPARK-2282: Well, something didn't work quite right.. our copy of 1.0.1 is the prebuilt copy for Hadoop 1/CDH3. So I did a git init in that directory, then did a git pull https://github.com/apache/spark/ +refs/pull/1503/head Well, that didn't work... I don't expect you to solve my git noob problems, so I'll work with someone here to figure it out. PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071767#comment-14071767 ] Ken Carlile commented on SPARK-2282: Merging just the two files also did not work. I received a bunch of these errors during the test: {code}Exception happened during processing of request from ('127.0.0.1', 33116) Traceback (most recent call last): File /usr/local/python-2.7.6/lib/python2.7/SocketServer.py, line 295, in _handle_request_noblock self.process_request(request, client_address) File /usr/local/python-2.7.6/lib/python2.7/SocketServer.py, line 321, in process_request self.finish_request(request, client_address) File /usr/local/python-2.7.6/lib/python2.7/SocketServer.py, line 334, in finish_request self.RequestHandlerClass(request, client_address, self) File /usr/local/python-2.7.6/lib/python2.7/SocketServer.py, line 649, in __init__ self.handle() File /usr/local/spark-current/python/pyspark/accumulators.py, line 224, in handle num_updates = read_int(self.rfile) File /usr/local/spark-current/python/pyspark/serializers.py, line 337, in read_int raise EOFError EOFError {code} And then it errored out with the usual java thing. PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14070367#comment-14070367 ] Ken Carlile commented on SPARK-2282: Hi Aaron, Another question for you. Would it work for me to just drop the two changed files into our install of Spark 1.0.1 release copy, or is that likely to cause issues? Thanks, Ken PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14064996#comment-14064996 ] Ken Carlile commented on SPARK-2282: So we've just given this a try with a 32 node cluster. Without the two sysctl commands, it obviously failed, using this code in pyspark: {code} data = sc.parallelize(range(0,3000), 2000).map(lambda x: range(0,300)) data.cache() data.count() for i in range(0,20): data.count() {code} Unfortunately, with the two sysctls implemented on all nodes in the cluster, it also failed. Here's the java errors we see: {code:java} 14/07/17 10:55:37 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext java.net.NoRouteToHostException: Cannot assign requested address at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.init(Socket.java:425) at java.net.Socket.init(Socket.java:208) at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:404) at org.apache.spark.api.python.PythonAccumulatorParam.addInPlace(PythonRDD.scala:387) at org.apache.spark.Accumulable.$plus$plus$eq(Accumulators.scala:72) at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:280) at org.apache.spark.Accumulators$$anonfun$add$2.apply(Accumulators.scala:278) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.Accumulators$.add(Accumulators.scala:278) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:820) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1226) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Traceback (most recent call last): File stdin, line 1, in module File /usr/local/spark-current/python/pyspark/rdd.py, line 708, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File /usr/local/spark-current/python/pyspark/rdd.py, line 699, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File /usr/local/spark-current/python/pyspark/rdd.py, line 619, in reduce vals = self.mapPartitions(func).collect() File /usr/local/spark-current/python/pyspark/rdd.py, line 583, in collect bytesInJava = self._jrdd.collect().iterator() File /usr/local/spark-current/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /usr/local/spark-current/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o158.collect. : org.apache.spark.SparkException: Job 14 cancelled as part of cancellation of all jobs at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1009) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499) at
[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065025#comment-14065025 ] Ken Carlile commented on SPARK-2282: A little more info: Nodes are running Scientific Linux 6.3 (Linux 2.6.32-279.el6.x86_64 #1 SMP Thu Jun 21 07:08:44 CDT 2012 x86_64 x86_64 x86_64 GNU/Linux) Spark is run against Python 2.7.6, Java 1.7.0.25, and Scala 2.10.3. spark-env.sh {code} #!/usr/bin/env bash ulimit -n 65535 export SCALA_HOME=/usr/local/scala-2.10.3 export SPARK_WORKER_DIR=/scratch/spark/work export JAVA_HOME=/usr/local/jdk1.7.0_25 export SPARK_LOG_DIR=~/.spark/logs/$JOB_ID/ export SPARK_EXECUTOR_MEMORY=100g export SPARK_DRIVER_MEMORY=100g export SPARK_WORKER_MEMORY=100g export SPARK_LOCAL_DIRS=/scratch/spark/tmp export PYSPARK_PYTHON=/usr/local/python-2.7.6/bin/python export SPARK_SLAVES=/scratch/spark/tmp/slaves {code} spark-defaults.conf: {code} spark.akka.timeout=300 spark.storage.blockManagerHeartBeatMs=3 spark.akka.retry.wait=30 spark.akka.frameSize=1 {code} PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14065549#comment-14065549 ] Ken Carlile commented on SPARK-2282: Awesome. I was afraid we were trying to chase down something else here. Glad to hear that it's a known issue and that you've got a good idea how to fix it. Thanks for the quick response! --Ken PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062916#comment-14062916 ] Ken Carlile commented on SPARK-2282: We may be running into this issue on our cluster. Any input on whether this property needs to be set on all nodes or on only the master? I ask because we dynamically spin up spark clusters on a larger general purpose compute cluster, so I'm hesitant to start changing sysctls willy nilly unless I absolutely have to. Thanks, Ken PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062916#comment-14062916 ] Ken Carlile edited comment on SPARK-2282 at 7/16/14 12:14 AM: -- We may be running into this issue on our cluster. Any input on whether this property needs to be set on all nodes or on only the master? I ask because we dynamically spin up spark clusters on a larger general purpose compute cluster, so I'm hesitant to start changing sysctls willy nilly unless I absolutely have to. Alternately, is that SO_REUSEADDR merely a setting one can chnage in one of the conf files, or is that within the software written for spark? (I'm coming at this from a sysadmin point of view, so the former would be much easier!) Thanks, Ken was (Author: carlilek): We may be running into this issue on our cluster. Any input on whether this property needs to be set on all nodes or on only the master? I ask because we dynamically spin up spark clusters on a larger general purpose compute cluster, so I'm hesitant to start changing sysctls willy nilly unless I absolutely have to. Thanks, Ken PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (SPARK-2282) PySpark crashes if too many tasks complete quickly
[ https://issues.apache.org/jira/browse/SPARK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062916#comment-14062916 ] Ken Carlile edited comment on SPARK-2282 at 7/16/14 12:17 AM: -- We may be running into this issue on our cluster. Any input on whether this property needs to be set on all nodes or on only the master? I ask because we dynamically spin up spark clusters on a larger general purpose compute cluster, so I'm hesitant to start changing sysctls willy nilly unless I absolutely have to. Alternately, is that SO_REUSEADDR merely a setting one can chnage in one of the conf files, or is that within the software written for spark? (I'm coming at this from a sysadmin point of view, so the former would be much easier!) Odd thing is that we're seeing it on 1.0.1, in which it is supposed to be fixed... Thanks, Ken was (Author: carlilek): We may be running into this issue on our cluster. Any input on whether this property needs to be set on all nodes or on only the master? I ask because we dynamically spin up spark clusters on a larger general purpose compute cluster, so I'm hesitant to start changing sysctls willy nilly unless I absolutely have to. Alternately, is that SO_REUSEADDR merely a setting one can chnage in one of the conf files, or is that within the software written for spark? (I'm coming at this from a sysadmin point of view, so the former would be much easier!) Thanks, Ken PySpark crashes if too many tasks complete quickly -- Key: SPARK-2282 URL: https://issues.apache.org/jira/browse/SPARK-2282 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 0.9.1, 1.0.0, 1.0.1 Reporter: Aaron Davidson Assignee: Aaron Davidson Fix For: 0.9.2, 1.0.0, 1.0.1 Upon every task completion, PythonAccumulatorParam constructs a new socket to the Accumulator server running inside the pyspark daemon. This can cause a buildup of used ephemeral ports from sockets in the TIME_WAIT termination stage, which will cause the SparkContext to crash if too many tasks complete too quickly. We ran into this bug with 17k tasks completing in 15 seconds. This bug can be fixed outside of Spark by ensuring these properties are set (on a linux server); echo 1 /proc/sys/net/ipv4/tcp_tw_reuse echo 1 /proc/sys/net/ipv4/tcp_tw_recycle or by adding the SO_REUSEADDR option to the Socket creation within Spark. -- This message was sent by Atlassian JIRA (v6.2#6252)