Re: Worker and Nodes
So increasing Executors without increasing physical resources If I have a 16 GB RAM system and then I allocate 1 GB for each executor, and give number of executors as 8, then I am increasing the resource right? In this case, how do you explain? Thank You On Sun, Feb 22, 2015 at 6:12 AM, Aaron Davidson ilike...@gmail.com wrote: Note that the parallelism (i.e., number of partitions) is just an upper bound on how much of the work can be done in parallel. If you have 200 partitions, then you can divide the work among between 1 and 200 cores and all resources will remain utilized. If you have more than 200 cores, though, then some will not be used, so you would want to increase parallelism further. (There are other rules-of-thumb -- for instance, it's generally good to have at least 2x more partitions than cores for straggler mitigation, but these are essentially just optimizations.) Further note that when you increase the number of Executors for the same set of resources (i.e., starting 10 Executors on a single machine instead of 1), you make Spark's job harder. Spark has to communicate in an all-to-all manner across Executors for shuffle operations, and it uses TCP sockets to do so whether or not the Executors happen to be on the same machine. So increasing Executors without increasing physical resources means Spark has to do more communication to do the same work. We expect that increasing the number of Executors by a factor of 10, given an increase in the number of physical resources by the same factor, would also improve performance by 10x. This is not always the case for the precise reason above (increased communication overhead), but typically we can get close. The actual observed improvement is very algorithm-dependent, though; for instance, some ML algorithms become hard to scale out past a certain point because the increase in communication overhead outweighs the increase in parallelism. On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, if I keep the number of instances constant and increase the degree of parallelism in steps, can I expect the performance to increase? Thank You On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, with the increase in the number of worker instances, if I also increase the degree of parallelism, will it make any difference? I can use this model even the other way round right? I can always predict the performance of an app with the increase in number of worker instances, the deterioration in performance, right? Thank You On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to
Re: Perf Prediction
Has anyone done any work on that? On Sun, Feb 22, 2015 at 9:57 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, exactly. On Sun, Feb 22, 2015 at 9:10 AM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: On Sat, Feb 21, 2015 at 8:54 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I am talking about some work parallel to prediction works that are done on GPUs. Like say, given the data for smaller number of nodes in a Spark cluster, the prediction needs to be done about the time that the application would take when we have larger number of nodes. Are you talking about predicting how performance would increase with adding more nodes/CPUs/whatever?
Re: Worker and Nodes
Note that the parallelism (i.e., number of partitions) is just an upper bound on how much of the work can be done in parallel. If you have 200 partitions, then you can divide the work among between 1 and 200 cores and all resources will remain utilized. If you have more than 200 cores, though, then some will not be used, so you would want to increase parallelism further. (There are other rules-of-thumb -- for instance, it's generally good to have at least 2x more partitions than cores for straggler mitigation, but these are essentially just optimizations.) Further note that when you increase the number of Executors for the same set of resources (i.e., starting 10 Executors on a single machine instead of 1), you make Spark's job harder. Spark has to communicate in an all-to-all manner across Executors for shuffle operations, and it uses TCP sockets to do so whether or not the Executors happen to be on the same machine. So increasing Executors without increasing physical resources means Spark has to do more communication to do the same work. We expect that increasing the number of Executors by a factor of 10, given an increase in the number of physical resources by the same factor, would also improve performance by 10x. This is not always the case for the precise reason above (increased communication overhead), but typically we can get close. The actual observed improvement is very algorithm-dependent, though; for instance, some ML algorithms become hard to scale out past a certain point because the increase in communication overhead outweighs the increase in parallelism. On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, if I keep the number of instances constant and increase the degree of parallelism in steps, can I expect the performance to increase? Thank You On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, with the increase in the number of worker instances, if I also increase the degree of parallelism, will it make any difference? I can use this model even the other way round right? I can always predict the performance of an app with the increase in number of worker instances, the deterioration in performance, right? Thank You On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you
Executor size and checkpoints
Hi all, I had a streaming application and midway through things decided to up the executor memory. I spent a long time launching like this: ~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest --executor-memory 2G --master... and observing the executor memory is still at old 512 setting I was about to ask if this is a bug when I decided to delete the checkpoints. Sure enough the setting took after that. So my question is -- why is it required to remove checkpoints to increase memory allowed on an executor? This seems pretty un-intuitive to me. Thanks for any insights.
Re: Perf Prediction
Yes, exactly. On Sun, Feb 22, 2015 at 9:10 AM, Ognen Duzlevski ognen.duzlev...@gmail.com wrote: On Sat, Feb 21, 2015 at 8:54 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I am talking about some work parallel to prediction works that are done on GPUs. Like say, given the data for smaller number of nodes in a Spark cluster, the prediction needs to be done about the time that the application would take when we have larger number of nodes. Are you talking about predicting how performance would increase with adding more nodes/CPUs/whatever?
Re: Worker and Nodes
Also, If I take SparkPageRank for example (org.apache.spark.examples), there are various RDDs that are created and transformed in the code that is written. If I want to increase the number of partitions and test out, what is the optimum number of partitions that gives me the best performance, I have to change the number of partitions in each run, right? Now, there are various RDDs there, so, which RDD do I partition? In other words, if I partition the first RDD that is created from the data in HDFS, am I ensured that other RDDs that are transformed from this RDD will also be partitioned in the same way? Thank You On Sun, Feb 22, 2015 at 10:02 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: So increasing Executors without increasing physical resources If I have a 16 GB RAM system and then I allocate 1 GB for each executor, and give number of executors as 8, then I am increasing the resource right? In this case, how do you explain? Thank You On Sun, Feb 22, 2015 at 6:12 AM, Aaron Davidson ilike...@gmail.com wrote: Note that the parallelism (i.e., number of partitions) is just an upper bound on how much of the work can be done in parallel. If you have 200 partitions, then you can divide the work among between 1 and 200 cores and all resources will remain utilized. If you have more than 200 cores, though, then some will not be used, so you would want to increase parallelism further. (There are other rules-of-thumb -- for instance, it's generally good to have at least 2x more partitions than cores for straggler mitigation, but these are essentially just optimizations.) Further note that when you increase the number of Executors for the same set of resources (i.e., starting 10 Executors on a single machine instead of 1), you make Spark's job harder. Spark has to communicate in an all-to-all manner across Executors for shuffle operations, and it uses TCP sockets to do so whether or not the Executors happen to be on the same machine. So increasing Executors without increasing physical resources means Spark has to do more communication to do the same work. We expect that increasing the number of Executors by a factor of 10, given an increase in the number of physical resources by the same factor, would also improve performance by 10x. This is not always the case for the precise reason above (increased communication overhead), but typically we can get close. The actual observed improvement is very algorithm-dependent, though; for instance, some ML algorithms become hard to scale out past a certain point because the increase in communication overhead outweighs the increase in parallelism. On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, if I keep the number of instances constant and increase the degree of parallelism in steps, can I expect the performance to increase? Thank You On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, with the increase in the number of worker instances, if I also increase the degree of parallelism, will it make any difference? I can use this model even the other way round right? I can always predict the performance of an app with the increase in number of worker instances, the deterioration in performance, right? Thank You On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time
Re: Query data in Spark RRD
Yes. As my understanding, it would allow me to write SQLs to query a spark context. But, the query needs to be specified within a job deployed. What I want is to be able to run multiple dynamic queries specified at runtime from a dashboard. -- Nikhil Bafna On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD ? Cheers On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Hi. My use case is building a realtime monitoring system over multi-dimensional data. The way I'm planning to go about it is to use Spark Streaming to store aggregated count over all dimensions in 10 sec interval. Then, from a dashboard, I would be able to specify a query over some dimensions, which will need re-aggregation from the already computed job. My query is, how can I run dynamic queries over data in schema RDDs? -- Nikhil Bafna
Re: Perf Prediction
On Sat, Feb 21, 2015 at 8:54 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I am talking about some work parallel to prediction works that are done on GPUs. Like say, given the data for smaller number of nodes in a Spark cluster, the prediction needs to be done about the time that the application would take when we have larger number of nodes. Are you talking about predicting how performance would increase with adding more nodes/CPUs/whatever?
Re: Which OutputCommitter to use for S3?
Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec You can use it by setting mapred.output.committer.class in the Hadoop configuration (or spark.hadoop.mapred.output.committer.class in the Spark configuration). Note that this only works for the old Hadoop APIs, I believe the new Hadoop APIs strongly tie committer to input format (so FileInputFormat always uses FileOutputCommitter), which makes this fix more difficult to apply. On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote: Josh is that class something you guys would consider open sourcing, or would you rather the community step up and create an OutputCommitter implementation optimized for S3? On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote: We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn’t get any response. It’d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.org user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu
Re: About FlumeUtils.createStream
Spark won't listen on mate, It basically means you have a flume source running at port of your localhost. And when you submit your application in standalone mode, workers will consume date from that port. Thanks Best Regards On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com wrote: Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data? --
Re: randomSplit instead of a huge map reduce ?
- Divide and conquer with reduceByKey (like Ashish mentioned, each pair being the key) would work - looks like a mapReduce with combiners problem. I think reduceByKey would use combiners while aggregateByKey wouldn't. - Could we optimize this further by using combineByKey directly ? Cheers k/ On Fri, Feb 20, 2015 at 6:39 PM, Ashish Rangole arang...@gmail.com wrote: Is there a check you can put in place to not create pairs that aren't in your set of 20M pairs? Additionally, once you have your arrays converted to pairs you can do aggregateByKey with each pair being the key. On Feb 20, 2015 1:57 PM, shlomib shl...@summerhq.com wrote: Hi, I am new to Spark and I think I missed something very basic. I have the following use case (I use Java and run Spark locally on my laptop): I have a JavaRDDString[] - The RDD contains around 72,000 arrays of strings (String[]) - Each array contains 80 words (on average). What I want to do is to convert each array into a new array/list of pairs, for example: Input: String[] words = ['a', 'b', 'c'] Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')] and then I want to count the number of times each pair appeared, so my final output should be something like: Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c', 8), (b', 'c', 10)] The problem: Since each array contains around 80 words, it returns around 3,200 pairs, so after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to reduce which require way too much memory. (I know I have only around *20,000,000* unique pairs!) I already modified my code and used 'mapPartitions' instead of 'map'. It definitely improved the performance, but I still feel I'm doing something completely wrong. I was wondering if this is the right 'Spark way' to solve this kind of problem, or maybe I should do something like splitting my original RDD into smaller parts (by using randomSplit), then iterate over each part, aggregate the results into some result RDD (by using 'union') and move on to the next part. Can anyone please explain me which solution is better? Thank you very much, Shlomi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Performance on Yarn
How many executors you have per machine? It will be helpful if you could list all the configs. Could you also try to run it without persist? Caching do hurt than help, if you don't have enough memory. On Fri, Feb 20, 2015 at 5:18 PM, Lee Bierman leebier...@gmail.com wrote: Thanks for the suggestions. I'm experimenting with different values for spark memoryOverhead and explictly giving the executors more memory, but still have not found the golden medium to get it to finish in a proper time frame. Is my cluster massively undersized at 5 boxes, 8gb 2cpu ? Trying to figure out a memory setting and executor setting so it runs on many containers in parallel. I'm still struggling as pig jobs and hive jobs on the same whole data set don't take as long. I'm wondering too if the logic in our code is just doing something silly causing multiple reads of all the data. On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote: If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Force RDD evaluation
I think the cheapest possible way to force materialization is something like rdd.foreachPartition(i = None) I get the use case, but as you can see there is a cost: you are forced to materialize an RDD and cache it just to measure the computation time. In principle this could be taking significantly more time than not doing so, since otherwise several RDD stages might proceed without ever even having to persist intermediate results in memory. Consider looking at the Spark UI to see how much time a stage took, although it's measuring end to end wall clock time, which may overlap with other computations. (or maybe you are disabling / enabling this logging for prod / test anyway) On Sat, Feb 21, 2015 at 4:46 AM, pnpritchard nicholas.pritch...@falkonry.com wrote: Is there a technique for forcing the evaluation of an RDD? I have used actions to do so but even the most basic count has a non-negligible cost (even on a cached RDD, repeated calls to count take time). My use case is for logging the execution time of the major components in my application. At the end of each component I have a statement like rdd.cache().count() and time how long it takes. Thanks in advance for any advice! Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Force-RDD-evaluation-tp21748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Spark Streaming for Batch?
I agree with your assessment as to why it *doesn't* just work. I don't think a small batch duration helps as all files it sees at the outset are processed in one batch. Your timestamps are a user-space concept not a framework concept. However, there ought to be a great deal of reusability between the two, so maybe a small refactoring lets you use 95% of it as-is. Isn't the core of your job to process an RDD of timestamp+data together with state to produce new state? if you have the pieces to do that, you should be able to hook them into Spark Streaming to its timestamp value, and its updateStateByKey, but then as easily just point this generic logic at an RDD from historical data and an empty initial state? On Sat, Feb 21, 2015 at 1:05 AM, craigv craigvanderbo...@gmail.com wrote: We have a sophisticated Spark Streaming application that we have been using successfully in production for over a year to process a time series of events. Our application makes novel use of updateStateByKey() for state management. We now have the need to perform exactly the same processing on input data that's not real-time, but has been persisted to disk. We do not want to rewrite our Spark Streaming app unless we have to. /Might it be possible to perform large batches processing on HDFS time series data using Spark Streaming?/ 1.I understand that there is not currently an InputDStream that could do what's needed. I would have to create such a thing. 2. Time is a problem. I would have to use the timestamps on our events for any time-based logic and state management 3. The batch duration would become meaningless in this scenario. Could I just set it to something really small (say 1 second) and then let it fall behind, processing the data as quickly as it could? It all seems possible. But could Spark Streaming work this way? If I created a DStream that delivered (say) months of events, could Spark Streaming effectively process this in a batch fashion? Any and all comments/ideas welcome! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Use-Spark-Streaming-for-Batch-tp21745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Perf Prediction
Can you be a bit more specific ? Are you asking about performance across Spark releases ? Cheers On Sat, Feb 21, 2015 at 6:38 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Has some performance prediction work been done on Spark? Thank You
Re: Worker and Nodes
In this case, I just wanted to know if a single node cluster with various workers act like a simulator of a multi-node cluster with various nodes. Like, if we have a single node cluster with 10 workers, say, then can we tell that the same behavior will take place with cluster of 10 nodes? It is like, without having the 10 nodes cluster, I can know the behavior of the application in 10 nodes cluster by having a single node with 10 workers. The time taken may vary but I am talking about the behavior. Can we say that? On Sat, Feb 21, 2015 at 8:21 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Worker and Nodes
There could be many different things causing this. For example, if you only have a single partition of data, increasing the number of tasks will only increase execution time due to higher scheduling overhead. Additionally, how large is a single partition in your application relative to the amount of memory on the machine? If you are running on a machine with a small amount of memory, increasing the number of executors per machine may increase GC/memory pressure. On a single node, since your executors share a memory and I/O system, you could just thrash everything. In any case, you can’t normally generalize between increased parallelism on a single node and increased parallelism across a cluster. If you are purely limited by CPU, then yes, you can normally make that generalization. However, when you increase the number of workers in a cluster, you are providing your app with more resources (memory capacity and bandwidth, and disk bandwidth). When you increase the number of tasks executing on a single node, you do not increase the pool of available resources. Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Feb 21, 2015, at 4:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Worker and Nodes
So, with the increase in the number of worker instances, if I also increase the degree of parallelism, will it make any difference? I can use this model even the other way round right? I can always predict the performance of an app with the increase in number of worker instances, the deterioration in performance, right? Thank You On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Worker and Nodes
Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Perf Prediction
No, I am talking about some work parallel to prediction works that are done on GPUs. Like say, given the data for smaller number of nodes in a Spark cluster, the prediction needs to be done about the time that the application would take when we have larger number of nodes. On Sat, Feb 21, 2015 at 8:22 PM, Ted Yu yuzhih...@gmail.com wrote: Can you be a bit more specific ? Are you asking about performance across Spark releases ? Cheers On Sat, Feb 21, 2015 at 6:38 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Has some performance prediction work been done on Spark? Thank You
Re: Worker and Nodes
What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Query data in Spark RRD
Have you looked at http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD ? Cheers On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com wrote: Hi. My use case is building a realtime monitoring system over multi-dimensional data. The way I'm planning to go about it is to use Spark Streaming to store aggregated count over all dimensions in 10 sec interval. Then, from a dashboard, I would be able to specify a query over some dimensions, which will need re-aggregation from the already computed job. My query is, how can I run dynamic queries over data in schema RDDs? -- Nikhil Bafna
Re: Worker and Nodes
No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Worker and Nodes
Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Worker and Nodes
Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: java.io.IOException: Filesystem closed
Are you replicating any RDDs? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-Filesystem-closed-tp20150p21749.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Worker and Nodes
So, if I keep the number of instances constant and increase the degree of parallelism in steps, can I expect the performance to increase? Thank You On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: So, with the increase in the number of worker instances, if I also increase the degree of parallelism, will it make any difference? I can use this model even the other way round right? I can always predict the performance of an app with the increase in number of worker instances, the deterioration in performance, right? Thank You On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I have decreased the executor memory. But,if I have to do this, then I have to tweak around with the code corresponding to each configuration right? On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote: Workers has a specific meaning in Spark. You are running many on one machine? that's possible but not usual. Each worker's executors have access to a fraction of your machine's resources then. If you're not increasing parallelism, maybe you're not actually using additional workers, so are using less resource for your problem. Or because the resulting executors are smaller, maybe you're hitting GC thrashing in these executors with smaller heaps. Or if you're not actually configuring the executors to use less memory, maybe you're over-committing your RAM and swapping? Bottom line, you wouldn't use multiple workers on one small standalone node. This isn't a good way to estimate performance on a distributed cluster either. On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: No, I just have a single node standalone cluster. I am not tweaking around with the code to increase parallelism. I am just running SparkKMeans that is there in Spark-1.0.0 I just wanted to know, if this behavior is natural. And if so, what causes this? Thank you On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote: What's your storage like? are you adding worker machines that are remote from where the data lives? I wonder if it just means you are spending more and more time sending the data over the network as you try to ship more of it to more remote workers. To answer your question, no in general more workers means more parallelism and therefore faster execution. But that depends on a lot of things. For example, if your process isn't parallelize to use all available execution slots, adding more slots doesn't do anything. On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Yes, I am talking about standalone single node cluster. No, I am not increasing parallelism. I just wanted to know if it is natural. Does message passing across the workers account for the happenning? I am running SparkKMeans, just to validate one prediction model. I am using several data sets. I have a standalone mode. I am varying the workers from 1 to 16 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote: I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Missing shuffle files
For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: Missing shuffle files
I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Query data in Spark RRD
Hi. My use case is building a realtime monitoring system over multi-dimensional data. The way I'm planning to go about it is to use Spark Streaming to store aggregated count over all dimensions in 10 sec interval. Then, from a dashboard, I would be able to specify a query over some dimensions, which will need re-aggregation from the already computed job. My query is, how can I run dynamic queries over data in schema RDDs? -- Nikhil Bafna
Perf Prediction
Hi, Has some performance prediction work been done on Spark? Thank You
Re: Worker and Nodes
I can imagine a few reasons. Adding workers might cause fewer tasks to execute locally (?) So you may be execute more remotely. Are you increasing parallelism? for trivial jobs, chopping them up further may cause you to pay more overhead of managing so many small tasks, for no speed up in execution time. Can you provide any more specifics though? you haven't said what you're running, what mode, how many workers, how long it takes, etc. On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Worker and Nodes
Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Worker and Nodes
Hi, I have experienced the same behavior. You are talking about standalone cluster mode right? BR On 21 February 2015 at 14:37, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have been running some jobs in my local single node stand alone cluster. I am varying the worker instances for the same job, and the time taken for the job to complete increases with increase in the number of workers. I repeated some experiments varying the number of nodes in a cluster too and the same behavior is seen. Can the idea of worker instances be extrapolated to the nodes in a cluster? Thank You
Re: Posting to the list
The message went through after all. Sorry for spamming. On 21.2.2015. 21:27, pzecevic wrote: Hi Spark users. Does anybody know what are the steps required to be able to post to this list by sending an email to user@spark.apache.org? I just sent a reply to Corey Nolet's mail Missing shuffle files but I don't think it was accepted by the engine. If I look at the Spark user list, I don't see this topic (Missing shuffle files) at all: http://apache-spark-user-list.1001560.n3.nabble.com/ I can see it in the archives, though: https://mail-archives.apache.org/mod_mbox/spark-user/201502.mbox/browser but my answer is not there. This is not the first time this happened and I am wondering what is going on. The engine is eating my emails? It doesn't like me? I am subscribed to the list and I have the Nabble account. I previously saw one of my email marked with This message has not been accepted by the mailing list yet. I read what that means, but I don't think it applies to me. What am I missing? P.S.: I am posting this through the Nabble web interface. Hope it gets through... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Posting-to-the-list-tp21750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark performance tuning
Can someone share some ideas about how to tune the GC time? Thanks From: java8...@hotmail.com To: user@spark.apache.org Subject: Spark performance tuning Date: Fri, 20 Feb 2015 16:04:23 -0500 Hi, I am new to Spark, and I am trying to test the Spark SQL performance vs Hive. I setup a standalone box, with 24 cores and 64G memory. We have one SQL in mind to test. Here is the basically setup on this one box for the SQL we are trying to run: 1) Dataset 1, 6.6G AVRO file with snappy compression, which contains nest structure of 3 array of struct in AVRO2) Dataset2, 5G AVRO file with snappy compression3) Dataset3, 2.3M AVRO file with snappy compression. The basic structure of the query is like this: (selectxxxfromdataset1 lateral view outer explode(struct1) lateral view outer explode(struct2)where x )left outer join(select from dataset2 lateral view explode(xxx) where )on left outer join(select xxx from dataset3 where )on x So overall what it does is 2 outer explode on dataset1, left outer join with explode of dataset2, then finally left outer join with dataset 3. On this standalone box, I installed Hadoop 2.2 and Hive 0.12, and Spark 1.2.0. Baseline, the above query can finish around 50 minutes in Hive 12, with 6 mappers and 3 reducers, each with 1G max heap, in 3 rounds of MR jobs. This is a very expensive query running in our production, of course with much bigger data set, every day. Now I want to see how fast Spark can do for the same query. I am using the following settings, based on my understanding of Spark, for a fair test between it and Hive: export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2g--executor-memory 9g --total-executor-cores 9 I am trying to run the one executor with 9 cores and max 9G heap, to make Spark use almost same resource we gave to the MapReduce. Here is the result without any additional configuration changes, running under Spark 1.2.0, using HiveContext in Spark SQL, to run the exactly same query: The Spark SQL generated 5 stage of tasks, shown below:4 collect at SparkPlan.scala:84 +details 2015/02/20 10:48:46 26 s200/200 3 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:32:07 16 min 200/200 1112.3 MB2 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 9 min 40/40 4.7 GB 22.2 GB1 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 1.9 min 50/50 6.2 GB 2.8 GB0 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 6 s 2/2 2.3 MB 156.6 KB So the wall time of whole query is 26s + 16m + 9m + 2m + 6s, around 28 minutes. It is about 56% of originally time, not bad. But I want to know any tuning of Spark can make it even faster. For stage 2 and 3, I observed that GC time is more and more expensive. Especially in stage 3, shown below: For stage 3:Metric Min 25th percentile Median 75th percentile MaxDuration20 s30 s35 s39 s 2.4 minGC Time 9 s 17 s20 s25 s 2.2 minShuffle Write 4.7 MB 4.9 MB 5.2 MB 6.1 MB 8.3 MB So in median, the GC time took overall 20s/35s = 57% of time. First change I made is to add the following line in the spark-default.conf:spark.serializer org.apache.spark.serializer.KryoSerializer My assumption is that using kryoSerializer, instead of default java serialize, will lower the memory footprint, should lower the GC pressure during runtime. I know the I changed the correct spark-default.conf, because if I were add spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps in the same file, I will see the GC usage in the stdout file. Of course, in this test, I didn't add that, as I want to only make one change a time.The result is almost the same, as using standard java serialize. The wall time is still 28 minutes, and in stage 3, the GC still took around 50 to 60% of time, almost same result within min, median to max in stage 3, without any noticeable performance gain. Next, based on my understanding, and for this test, I think the default spark.storage.memoryFraction is too high for this query, as there is no reason to reserve so much memory for caching data, Because we don't reuse any dataset in this one query. So I add this at the end of spark-shell command --conf spark.storage.memoryFraction=0.3, as I want to just reserve half of the memory for caching data vs first time. Of course, this time, I rollback the first change of KryoSerializer. The result looks like almost the same. The whole query finished around 28s + 14m + 9.6m + 1.9m + 6s = 27 minutes. It looks like that Spark is faster than Hive, but is there any steps I can make it even faster? Why using KryoSerializer makes no difference? If I want to use the same resource as now, anything I can do to speed it up
Posting to the list
Hi Spark users. Does anybody know what are the steps required to be able to post to this list by sending an email to user@spark.apache.org? I just sent a reply to Corey Nolet's mail Missing shuffle files but I don't think it was accepted by the engine. If I look at the Spark user list, I don't see this topic (Missing shuffle files) at all: http://apache-spark-user-list.1001560.n3.nabble.com/ I can see it in the archives, though: https://mail-archives.apache.org/mod_mbox/spark-user/201502.mbox/browser but my answer is not there. This is not the first time this happened and I am wondering what is going on. The engine is eating my emails? It doesn't like me? I am subscribed to the list and I have the Nabble account. I previously saw one of my email marked with This message has not been accepted by the mailing list yet. I read what that means, but I don't think it applies to me. What am I missing? P.S.: I am posting this through the Nabble web interface. Hope it gets through... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Posting-to-the-list-tp21750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Missing shuffle files
Could you try to turn on the external shuffle service? spark.shuffle.service.enable= true On 21.2.2015. 17:50, Corey Nolet wrote: I'm experiencing the same issue. Upon closer inspection I'm noticing that executors are being lost as well. Thing is, I can't figure out how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory allocated for the application. I was thinking perhaps it was possible that a single executor was getting a single or a couple large partitions but shouldn't the disk persistence kick in at that point? On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com mailto:arp...@spotify.com wrote: For large jobs, the following error message is shown that seems to indicate that shuffle files for some reason are missing. It's a rather large job with many partitions. If the data size is reduced, the problem disappears. I'm running a build from Spark master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this problem? User class threw exception: Job aborted due to stage failure: Task 450 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net http://lon4-hadoopslave-b77.lon4.spotify.net): java.io.FileNotFoundException: /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at java.io.FileOutputStream.(FileOutputStream.java:171) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) TIA, Anders
Re: Which OutputCommitter to use for S3?
Josh is that class something you guys would consider open sourcing, or would you rather the community step up and create an OutputCommitter implementation optimized for S3? On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote: We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn’t get any response. It’d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.org user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu