Re: rdd.count with 100 elements taking 1 second to run
Hi Akhil, I discovered the reason for this problem. There was in issue with my deployment (Google Cloud Platform). My spark master was on a different region than the slaves. This resulted in huge scheduler delays. Thanks, Anshul On Thu, Apr 30, 2015 at 1:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Does this speed up? val rdd = sc.parallelize(1 to 100*, 30)* rdd.count Thanks Best Regards On Wed, Apr 29, 2015 at 1:47 AM, Anshul Singhle ans...@betaglide.com wrote: Hi, I'm running the following code in my cluster (standalone mode) via spark shell - val rdd = sc.parallelize(1 to 100) rdd.count This takes around 1.2s to run. Is this expected or am I configuring something wrong? I'm using about 30 cores with 512MB executor memory As expected, GC time is negligible. I'm just getting some scheduler delay and 1s to launch the task Thanks, Anshul
Re: java.io.IOException: No space left on device
Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Initial tasks in job take time
yes On 29 Apr 2015 03:31, ayan guha guha.a...@gmail.com wrote: Are your driver running on the same m/c as master? On 29 Apr 2015 03:59, Anshul Singhle ans...@betaglide.com wrote: Hi, I'm running short spark jobs on rdds cached in memory. I'm also using a long running job context. I want to be able to complete my jobs (on the cached rdd) in under 1 sec. I'm getting the following job times with about 15 GB of data distributed across 6 nodes. Each executor has about 20GB of memory available. My context has about 26 cores in total. If number of partitions no of cores - Some jobs run in 3s others take about 6s -- the time difference can be explained by GC time. If number of partitions = no of cores - All jobs run in 4s. The initial tasks of each stage on every executor take about 1s. If partitions cores - Jobs take more time. The initial tasks of each stage on every executor take about 1s. The other tasks run in 45-50 ms each. However, since the initial tasks again take about 1s each, the total time in this case is about 6s which is more than the previous case. Clearly the limiting factor here is the initial set of tasks. For every case, these tasks take 1s to run, no matter the amount of partitions. Hence best results are obtained with partitions = cores, because in that case. every core gets 1 task which takes 1s to run. In this case, I get 0 GC time. The only explanation is scheduling delay which is about 0.2 - 0.3 seconds. I looked at my task size and result size and that has no bearing on this delay. Also, I'm not getting the task size warnings in the logs. For what I can understand, the first time a task runs on a core, it takes 1s to run. Is this normal? Is it possible to get sub-second latencies? Can something be done about the scheduler delay? What other things can I look at to reduce this time? Regards, Anshul
rdd.count with 100 elements taking 1 second to run
Hi, I'm running the following code in my cluster (standalone mode) via spark shell - val rdd = sc.parallelize(1 to 100) rdd.count This takes around 1.2s to run. Is this expected or am I configuring something wrong? I'm using about 30 cores with 512MB executor memory As expected, GC time is negligible. I'm just getting some scheduler delay and 1s to launch the task Thanks, Anshul
Initial tasks in job take time
Hi, I'm running short spark jobs on rdds cached in memory. I'm also using a long running job context. I want to be able to complete my jobs (on the cached rdd) in under 1 sec. I'm getting the following job times with about 15 GB of data distributed across 6 nodes. Each executor has about 20GB of memory available. My context has about 26 cores in total. If number of partitions no of cores - Some jobs run in 3s others take about 6s -- the time difference can be explained by GC time. If number of partitions = no of cores - All jobs run in 4s. The initial tasks of each stage on every executor take about 1s. If partitions cores - Jobs take more time. The initial tasks of each stage on every executor take about 1s. The other tasks run in 45-50 ms each. However, since the initial tasks again take about 1s each, the total time in this case is about 6s which is more than the previous case. Clearly the limiting factor here is the initial set of tasks. For every case, these tasks take 1s to run, no matter the amount of partitions. Hence best results are obtained with partitions = cores, because in that case. every core gets 1 task which takes 1s to run. In this case, I get 0 GC time. The only explanation is scheduling delay which is about 0.2 - 0.3 seconds. I looked at my task size and result size and that has no bearing on this delay. Also, I'm not getting the task size warnings in the logs. For what I can understand, the first time a task runs on a core, it takes 1s to run. Is this normal? Is it possible to get sub-second latencies? Can something be done about the scheduler delay? What other things can I look at to reduce this time? Regards, Anshul
Re: Instantiating/starting Spark jobs programmatically
Hi firemonk9, What you're doing looks interesting. Can you share some more details? Are you running the same spark context for each job, or are you running a seperate spark context for each job? Does your system need sharing of rdd's across multiple jobs? If yes, how do you implement that? Also what prompted you to run Yarn instead of standalone? Does this give some performance benefit? Have you evaluated yarn vs mesos? Also have you looked at spark jobserver by ooyala? It makes doing some if the stuff I mentioned easier. IIRC it also works with yarn. Definitely works with Mesos. Heres the link https://github.com/spark-jobserver/spark-jobserver Thanks Anshul On 23 Apr 2015 20:32, Dean Wampler deanwamp...@gmail.com wrote: I strongly recommend spawning a new process for the Spark jobs. Much cleaner separation. Your driver program won't be clobbered if the Spark job dies, etc. It can even watch for failures and restart. In the Scala standard library, the sys.process package has classes for constructing and interoperating with external processes. Perhaps Java has something similar these days? dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com wrote: On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com wrote: - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. people can block errant System.exit calls by running under a SecurityManager. Less than ideal (and there's a small performance hit) -but possible
Getting outofmemory errors on spark
Hi, I'm reading data stored in S3 and aggregating and storing it in Cassandra using a spark job. When I run the job with approx 3Mil records (about 3-4 GB of data) stored in text files, I get the following error: (11529/14925)15/04/10 19:32:43 INFO TaskSetManager: Starting task 11609.0 in stage 4.0 (TID 56384, spark-slaves-test-cluster-k0b6.c.silver-argon-837.internal, PROCESS_LOCAL, 134 System information as of Fri Apr 10 19:08:57 UTC 201515/04/10 19:32:58 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriv System load: 0.07 Processes: 155 Usage of /: 48.3% of 9.81GB Users logged in: 015/04/10 19:32:58 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver] *java.lang.OutOfMemoryError: GC overhead limit exceeded at* java.util.Arrays.copyOf(Arrays.java:2367) at java.lang. AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal( AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append( AbstractStringBuilder.java:535) at java.lang.StringBuilder.append(StringBuilder.java:204) at java.io.ObjectInputStream$BlockDataInputStream. readUTFSpan(ObjectInputStream.java:3143) at java.io.ObjectInputStream$ BlockDataInputStream.readUTFBody(ObjectInputStream.java:3051) at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864) at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization. Serialization.deserialize(Serialization.scala:98) at akka.remote.serialization.MessageContainerSerializer.fromBinary( MessageContainerSerializer.scala:63) at akka.serialization. Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization. Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) This error occurs in the final step of my script, when i'm storing the processed records in Cassandra. My memory-per-node is 10GB which means that *all my records should fit on one machine.* The script is in pyspark and I'm using a cluster with: - *Workers:* 5 - *Cores:* 80 Total, 80 Used - *Memory:* 506.5 GB Total, 40.0 GB Used Here is the relevant part of the code, for reference : def connectAndSave(partition): cluster = Cluster(['10.240.1.17']) dbsession = cluster.connect(load_test) ret = map(lambda x : saveUserData(x,dbsession),partition) dbsession.shutdown() cluster.shutdown() res = sessionsRdd.foreachPartition(lambda partition : connectAndSave( partition))