Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
 So increasing Executors without increasing physical resources
If I have a 16 GB RAM system and then I allocate 1 GB for each executor,
and give number of executors as 8, then I am increasing the resource right?
In this case, how do you explain?

Thank You

On Sun, Feb 22, 2015 at 6:12 AM, Aaron Davidson ilike...@gmail.com wrote:

 Note that the parallelism (i.e., number of partitions) is just an upper
 bound on how much of the work can be done in parallel. If you have 200
 partitions, then you can divide the work among between 1 and 200 cores and
 all resources will remain utilized. If you have more than 200 cores,
 though, then some will not be used, so you would want to increase
 parallelism further. (There are other rules-of-thumb -- for instance, it's
 generally good to have at least 2x more partitions than cores for straggler
 mitigation, but these are essentially just optimizations.)

 Further note that when you increase the number of Executors for the same
 set of resources (i.e., starting 10 Executors on a single machine instead
 of 1), you make Spark's job harder. Spark has to communicate in an
 all-to-all manner across Executors for shuffle operations, and it uses TCP
 sockets to do so whether or not the Executors happen to be on the same
 machine. So increasing Executors without increasing physical resources
 means Spark has to do more communication to do the same work.

 We expect that increasing the number of Executors by a factor of 10, given
 an increase in the number of physical resources by the same factor, would
 also improve performance by 10x. This is not always the case for the
 precise reason above (increased communication overhead), but typically we
 can get close. The actual observed improvement is very algorithm-dependent,
 though; for instance, some ML algorithms become hard to scale out past a
 certain point because the increase in communication overhead outweighs the
 increase in parallelism.

 On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 So, if I keep the number of instances constant and increase the degree of
 parallelism in steps, can I expect the performance to increase?

 Thank You

 On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 So, with the increase in the number of worker instances, if I also
 increase the degree of parallelism, will it make any difference?
 I can use this model even the other way round right? I can always
 predict the performance of an app with the increase in number of worker
 instances, the deterioration in performance, right?

 Thank You

 On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com
  wrote:

 Yes, I have decreased the executor memory.
 But,if I have to do this, then I have to tweak around with the code
 corresponding to each configuration right?

 On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan 
 pradhandeep1...@gmail.com wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I am
 just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com
 wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time sending the data over the network as you
  try to ship more of it to more remote workers.
 
  To answer your question, no in general more workers means more
  parallelism and therefore faster execution. But that depends on a
 lot
  of things. For example, if your process isn't parallelize to use all
  available execution slots, adding more slots doesn't do anything.
 
  On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Yes, I am talking about standalone single node cluster.
  
   No, I am not increasing parallelism. I just wanted to know if it
 is
   natural.
   Does message passing across the workers account for the
 happenning?
  
   I am running SparkKMeans, just to 

Re: Perf Prediction

2015-02-21 Thread Deep Pradhan
Has anyone done any work on that?

On Sun, Feb 22, 2015 at 9:57 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Yes, exactly.

 On Sun, Feb 22, 2015 at 9:10 AM, Ognen Duzlevski 
 ognen.duzlev...@gmail.com wrote:

 On Sat, Feb 21, 2015 at 8:54 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 No, I am talking about some work parallel to prediction works that are
 done on GPUs. Like say, given the data for smaller number of nodes in a
 Spark cluster, the prediction needs to be done about the time that the
 application would take when we have larger number of nodes.


 Are you talking about predicting how performance would increase with
 adding more nodes/CPUs/whatever?





Re: Worker and Nodes

2015-02-21 Thread Aaron Davidson
Note that the parallelism (i.e., number of partitions) is just an upper
bound on how much of the work can be done in parallel. If you have 200
partitions, then you can divide the work among between 1 and 200 cores and
all resources will remain utilized. If you have more than 200 cores,
though, then some will not be used, so you would want to increase
parallelism further. (There are other rules-of-thumb -- for instance, it's
generally good to have at least 2x more partitions than cores for straggler
mitigation, but these are essentially just optimizations.)

Further note that when you increase the number of Executors for the same
set of resources (i.e., starting 10 Executors on a single machine instead
of 1), you make Spark's job harder. Spark has to communicate in an
all-to-all manner across Executors for shuffle operations, and it uses TCP
sockets to do so whether or not the Executors happen to be on the same
machine. So increasing Executors without increasing physical resources
means Spark has to do more communication to do the same work.

We expect that increasing the number of Executors by a factor of 10, given
an increase in the number of physical resources by the same factor, would
also improve performance by 10x. This is not always the case for the
precise reason above (increased communication overhead), but typically we
can get close. The actual observed improvement is very algorithm-dependent,
though; for instance, some ML algorithms become hard to scale out past a
certain point because the increase in communication overhead outweighs the
increase in parallelism.

On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 So, if I keep the number of instances constant and increase the degree of
 parallelism in steps, can I expect the performance to increase?

 Thank You

 On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 So, with the increase in the number of worker instances, if I also
 increase the degree of parallelism, will it make any difference?
 I can use this model even the other way round right? I can always predict
 the performance of an app with the increase in number of worker instances,
 the deterioration in performance, right?

 Thank You

 On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Yes, I have decreased the executor memory.
 But,if I have to do this, then I have to tweak around with the code
 corresponding to each configuration right?

 On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan 
 pradhandeep1...@gmail.com wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I am
 just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com
 wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time sending the data over the network as you
  try to ship more of it to more remote workers.
 
  To answer your question, no in general more workers means more
  parallelism and therefore faster execution. But that depends on a lot
  of things. For example, if your process isn't parallelize to use all
  available execution slots, adding more slots doesn't do anything.
 
  On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Yes, I am talking about standalone single node cluster.
  
   No, I am not increasing parallelism. I just wanted to know if it is
   natural.
   Does message passing across the workers account for the happenning?
  
   I am running SparkKMeans, just to validate one prediction model. I
 am
   using
   several data sets. I have a standalone mode. I am varying the
 workers
   from 1
   to 16
  
   On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com
 wrote:
  
   I can imagine a few reasons. Adding workers might cause fewer
 tasks to
   execute locally (?) So you may be execute more remotely.
  
   Are you 

Executor size and checkpoints

2015-02-21 Thread Yana Kadiyska
Hi all,

I had a streaming application and midway through things decided to up the
executor memory. I spent a long time launching like this:

~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest
--executor-memory 2G --master...

and observing the executor memory is still at old 512 setting

I was about to ask if this is a bug when I decided to delete the
checkpoints. Sure enough the setting took after that.

So my question is -- why is it required to remove checkpoints to increase
memory allowed on an executor? This seems pretty un-intuitive to me.

Thanks for any insights.


Re: Perf Prediction

2015-02-21 Thread Deep Pradhan
Yes, exactly.

On Sun, Feb 22, 2015 at 9:10 AM, Ognen Duzlevski ognen.duzlev...@gmail.com
wrote:

 On Sat, Feb 21, 2015 at 8:54 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 No, I am talking about some work parallel to prediction works that are
 done on GPUs. Like say, given the data for smaller number of nodes in a
 Spark cluster, the prediction needs to be done about the time that the
 application would take when we have larger number of nodes.


 Are you talking about predicting how performance would increase with
 adding more nodes/CPUs/whatever?



Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
Also, If I take SparkPageRank for example (org.apache.spark.examples),
there are various RDDs that are created and transformed in the code that is
written. If I want to increase the number of partitions and test out, what
is the optimum number of partitions that gives me the best performance, I
have to change the number of partitions in each run, right? Now, there are
various RDDs there, so, which RDD do I partition? In other words, if I
partition the first RDD that is created from the data in HDFS, am I ensured
that other RDDs that are transformed from this RDD will also be partitioned
in the same way?

Thank You

On Sun, Feb 22, 2015 at 10:02 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

  So increasing Executors without increasing physical resources
 If I have a 16 GB RAM system and then I allocate 1 GB for each executor,
 and give number of executors as 8, then I am increasing the resource right?
 In this case, how do you explain?

 Thank You

 On Sun, Feb 22, 2015 at 6:12 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 Note that the parallelism (i.e., number of partitions) is just an upper
 bound on how much of the work can be done in parallel. If you have 200
 partitions, then you can divide the work among between 1 and 200 cores and
 all resources will remain utilized. If you have more than 200 cores,
 though, then some will not be used, so you would want to increase
 parallelism further. (There are other rules-of-thumb -- for instance, it's
 generally good to have at least 2x more partitions than cores for straggler
 mitigation, but these are essentially just optimizations.)

 Further note that when you increase the number of Executors for the same
 set of resources (i.e., starting 10 Executors on a single machine instead
 of 1), you make Spark's job harder. Spark has to communicate in an
 all-to-all manner across Executors for shuffle operations, and it uses TCP
 sockets to do so whether or not the Executors happen to be on the same
 machine. So increasing Executors without increasing physical resources
 means Spark has to do more communication to do the same work.

 We expect that increasing the number of Executors by a factor of 10,
 given an increase in the number of physical resources by the same factor,
 would also improve performance by 10x. This is not always the case for the
 precise reason above (increased communication overhead), but typically we
 can get close. The actual observed improvement is very algorithm-dependent,
 though; for instance, some ML algorithms become hard to scale out past a
 certain point because the increase in communication overhead outweighs the
 increase in parallelism.

 On Sat, Feb 21, 2015 at 8:19 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 So, if I keep the number of instances constant and increase the degree
 of parallelism in steps, can I expect the performance to increase?

 Thank You

 On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com
  wrote:

 So, with the increase in the number of worker instances, if I also
 increase the degree of parallelism, will it make any difference?
 I can use this model even the other way round right? I can always
 predict the performance of an app with the increase in number of worker
 instances, the deterioration in performance, right?

 Thank You

 On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan 
 pradhandeep1...@gmail.com wrote:

 Yes, I have decreased the executor memory.
 But,if I have to do this, then I have to tweak around with the code
 corresponding to each configuration right?

 On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan 
 pradhandeep1...@gmail.com wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I
 am just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com
 wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time 

Re: Query data in Spark RRD

2015-02-21 Thread Nikhil Bafna
Yes. As my understanding, it would allow me to write SQLs to query a spark
context. But, the query needs to be specified within a job  deployed.

What I want is to be able to run multiple dynamic queries specified at
runtime from a dashboard.



--
Nikhil Bafna

On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at
 http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ?

 Cheers

 On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com
 wrote:


 Hi.

 My use case is building a realtime monitoring system over
 multi-dimensional data.

 The way I'm planning to go about it is to use Spark Streaming to store
 aggregated count over all dimensions in 10 sec interval.

 Then, from a dashboard, I would be able to specify a query over some
 dimensions, which will need re-aggregation from the already computed job.

 My query is, how can I run dynamic queries over data in schema RDDs?

 --
 Nikhil Bafna





Re: Perf Prediction

2015-02-21 Thread Ognen Duzlevski
On Sat, Feb 21, 2015 at 8:54 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 No, I am talking about some work parallel to prediction works that are
 done on GPUs. Like say, given the data for smaller number of nodes in a
 Spark cluster, the prediction needs to be done about the time that the
 application would take when we have larger number of nodes.


Are you talking about predicting how performance would increase with adding
more nodes/CPUs/whatever?


Re: Which OutputCommitter to use for S3?

2015-02-21 Thread Aaron Davidson
Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec

You can use it by setting mapred.output.committer.class in the Hadoop
configuration (or spark.hadoop.mapred.output.committer.class in the Spark
configuration). Note that this only works for the old Hadoop APIs, I
believe the new Hadoop APIs strongly tie committer to input format (so
FileInputFormat always uses FileOutputCommitter), which makes this fix more
difficult to apply.

On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote:

 Josh is that class something you guys would consider open sourcing, or
 would you rather the community step up and create an OutputCommitter
 implementation optimized for S3?

 On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

 We (Databricks) use our own DirectOutputCommitter implementation, which
 is a couple tens of lines of Scala code.  The class would almost entirely
 be a no-op except we took some care to properly handle the _SUCCESS file.

 On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

  I didn’t get any response. It’d be really appreciated if anyone using
 a special OutputCommitter for S3 can comment on this!

  Thanks,
 Mingyu

   From: Mingyu Kim m...@palantir.com
 Date: Monday, February 16, 2015 at 1:15 AM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Which OutputCommitter to use for S3?

   HI all,

  The default OutputCommitter used by RDD, which is FileOutputCommitter,
 seems to require moving files at the commit step, which is not a constant
 operation in S3, as discussed in
 http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E
 https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=.
 People seem to develop their own NullOutputCommitter implementation or use
 DirectFileOutputCommitter (as mentioned in SPARK-3595
 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=),
 but I wanted to check if there is a de facto standard, publicly available
 OutputCommitter to use for S3 in conjunction with Spark.

  Thanks,
 Mingyu






Re: About FlumeUtils.createStream

2015-02-21 Thread Akhil Das
Spark won't listen on  mate, It basically means you have a flume source
running at port  of your localhost. And when you submit your
application in standalone mode, workers will consume date from that port.

Thanks
Best Regards

On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com bit1...@163.com wrote:


 Hi,
 In the spark streaming application, I write the code, 
 FlumeUtils.createStream(ssc,localhost,),which
 means spark will listen on the  port, and wait for Flume Sink to write
 to it.
 My question is:  when I submit the application to the Spark Standalone
 cluster, will  be opened only on the Driver Machine or all the workers
 will also open the  port and wait for the Flume data?

 --




Re: randomSplit instead of a huge map reduce ?

2015-02-21 Thread Krishna Sankar
   - Divide and conquer with reduceByKey (like Ashish mentioned, each pair
   being the key) would work - looks like a mapReduce with combiners
   problem. I think reduceByKey would use combiners while aggregateByKey
   wouldn't.
   - Could we optimize this further by using combineByKey directly ?

Cheers
k/

On Fri, Feb 20, 2015 at 6:39 PM, Ashish Rangole arang...@gmail.com wrote:

 Is there a check you can put in place to not create pairs that aren't in
 your set of 20M pairs? Additionally, once you have your arrays converted to
 pairs you can do aggregateByKey with each pair being the key.
 On Feb 20, 2015 1:57 PM, shlomib shl...@summerhq.com wrote:

 Hi,

 I am new to Spark and I think I missed something very basic.

 I have the following use case (I use Java and run Spark locally on my
 laptop):


 I have a JavaRDDString[]

 - The RDD contains around 72,000 arrays of strings (String[])

 - Each array contains 80 words (on average).


 What I want to do is to convert each array into a new array/list of pairs,
 for example:

 Input: String[] words = ['a', 'b', 'c']

 Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')]

 and then I want to count the number of times each pair appeared, so my
 final
 output should be something like:

 Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c',
 8), (b', 'c', 10)]


 The problem:

 Since each array contains around 80 words, it returns around 3,200 pairs,
 so
 after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs
 to
 reduce which require way too much memory.

 (I know I have only around *20,000,000* unique pairs!)

 I already modified my code and used 'mapPartitions' instead of 'map'. It
 definitely improved the performance, but I still feel I'm doing something
 completely wrong.


 I was wondering if this is the right 'Spark way' to solve this kind of
 problem, or maybe I should do something like splitting my original RDD
 into
 smaller parts (by using randomSplit), then iterate over each part,
 aggregate
 the results into some result RDD (by using 'union') and move on to the
 next
 part.


 Can anyone please explain me which solution is better?


 Thank you very much,

 Shlomi.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Performance on Yarn

2015-02-21 Thread Davies Liu
How many executors you have per machine? It will be helpful if you
could list all the configs.

Could you also try to run it without persist? Caching do hurt than
help, if you don't have enough memory.

On Fri, Feb 20, 2015 at 5:18 PM, Lee Bierman leebier...@gmail.com wrote:
 Thanks for the suggestions.
 I'm experimenting with different values for spark memoryOverhead and
 explictly giving the executors more memory, but still have not found the
 golden medium to get it to finish in a proper time frame.

 Is my cluster massively undersized at 5 boxes, 8gb 2cpu ?
 Trying to figure out a memory setting and executor setting so it runs on
 many containers in parallel.

 I'm still struggling as pig jobs and hive jobs on the same whole data set
 don't take as long. I'm wondering too if the logic in our code is just doing
 something silly causing multiple reads of all the data.


 On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 If that's the error you're hitting, the fix is to boost
 spark.yarn.executor.memoryOverhead, which will put some extra room in
 between the executor heap sizes and the amount of memory requested for them
 from YARN.

 -Sandy

 On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote:

 A bit more context on this issue. From the container logs on the executor

 Given my cluster specs above what would be appropriate parameters to pass
 into :
 --num-executors --num-cores --executor-memory

 I had tried it with --executor-memory 2500MB

 015-02-20 06:50:09,056 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=23320,containerID=container_1423083596644_0238_01_004160]
 is
 running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
 physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
 container.
 Dump of the process-tree for container_1423083596644_0238_01_004160 :
 |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
 SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
 |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend

 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/CoarseGrainedScheduler
 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
 2

 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
 |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
 -Xms2400m
 -Xmx2400m

 -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp

 -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Force RDD evaluation

2015-02-21 Thread Sean Owen
I think the cheapest possible way to force materialization is something like

rdd.foreachPartition(i = None)

I get the use case, but as you can see there is a cost: you are forced
to materialize an RDD and cache it just to measure the computation
time. In principle this could be taking significantly more time than
not doing so, since otherwise several RDD stages might proceed without
ever even having to persist intermediate results in memory.

Consider looking at the Spark UI to see how much time a stage took,
although it's measuring end to end wall clock time, which may overlap
with other computations.

(or maybe you are disabling / enabling this logging for prod / test anyway)

On Sat, Feb 21, 2015 at 4:46 AM, pnpritchard
nicholas.pritch...@falkonry.com wrote:
 Is there a technique for forcing the evaluation of an RDD?

 I have used actions to do so but even the most basic count has a
 non-negligible cost (even on a cached RDD, repeated calls to count take
 time).

 My use case is for logging the execution time of the major components in my
 application. At the end of each component I have a statement like
 rdd.cache().count() and time how long it takes.

 Thanks in advance for any advice!
 Nick



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Force-RDD-evaluation-tp21748.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Use Spark Streaming for Batch?

2015-02-21 Thread Sean Owen
I agree with your assessment as to why it *doesn't* just work. I don't
think a small batch duration helps as all files it sees at the outset
are processed in one batch. Your timestamps are a user-space concept
not a framework concept.

However, there ought to be a great deal of reusability between the
two, so maybe a small refactoring lets you use 95% of it as-is.

Isn't the core of your job to process an RDD of timestamp+data
together with state to produce new state? if you have the pieces to do
that, you should be able to hook them into Spark Streaming to its
timestamp value, and its updateStateByKey, but then as easily just
point this generic logic at an RDD from historical data and an empty
initial state?

On Sat, Feb 21, 2015 at 1:05 AM, craigv craigvanderbo...@gmail.com wrote:
 We have a sophisticated Spark Streaming application that we have been using
 successfully in production for over a year to process a time series of
 events.  Our application makes novel use of updateStateByKey() for state
 management.

 We now have the need to perform exactly the same processing on input data
 that's not real-time, but has been persisted to disk.  We do not want to
 rewrite our Spark Streaming app unless we have to.

 /Might it be possible to perform large batches processing on HDFS time
 series data using Spark Streaming?/

 1.I understand that there is not currently an InputDStream that could do
 what's needed.  I would have to create such a thing.
 2. Time is a problem.  I would have to use the timestamps on our events for
 any time-based logic and state management
 3. The batch duration would become meaningless in this scenario.  Could I
 just set it to something really small (say 1 second) and then let it fall
 behind, processing the data as quickly as it could?

 It all seems possible.  But could Spark Streaming work this way?  If I
 created a DStream that delivered (say) months of events, could Spark
 Streaming effectively process this in a batch fashion?

 Any and all comments/ideas welcome!






 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Use-Spark-Streaming-for-Batch-tp21745.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Perf Prediction

2015-02-21 Thread Ted Yu
Can you be a bit more specific ?

Are you asking about performance across Spark releases ?

Cheers

On Sat, Feb 21, 2015 at 6:38 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 Has some performance prediction work been done on Spark?

 Thank You




Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
In this case, I just wanted to know if a single node cluster with various
workers act like a simulator of a multi-node cluster with various nodes.
Like, if we have a single node cluster with 10 workers, say, then can we
tell that the same behavior will take place with cluster of 10 nodes?
It is like, without having the 10 nodes cluster, I can know the behavior of
the application in 10 nodes cluster by having a single node with 10
workers. The time taken may vary but I am talking about the behavior. Can
we say that?

On Sat, Feb 21, 2015 at 8:21 PM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Yes, I am talking about standalone single node cluster.

 No, I am not increasing parallelism. I just wanted to know if it is
 natural. Does message passing across the workers account for the happenning?

 I am running SparkKMeans, just to validate one prediction model. I am
 using several data sets. I have a standalone mode. I am varying the workers
 from 1 to 16

 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote:

 I can imagine a few reasons. Adding workers might cause fewer tasks to
 execute locally (?) So you may be execute more remotely.

 Are you increasing parallelism? for trivial jobs, chopping them up
 further may cause you to pay more overhead of managing so many small
 tasks, for no speed up in execution time.

 Can you provide any more specifics though? you haven't said what
 you're running, what mode, how many workers, how long it takes, etc.

 On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  Hi,
  I have been running some jobs in my local single node stand alone
 cluster. I
  am varying the worker instances for the same job, and the time taken
 for the
  job to complete increases with increase in the number of workers. I
 repeated
  some experiments varying the number of nodes in a cluster too and the
 same
  behavior is seen.
  Can the idea of worker instances be extrapolated to the nodes in a
 cluster?
 
  Thank You





Re: Worker and Nodes

2015-02-21 Thread Frank Austin Nothaft
There could be many different things causing this. For example, if you only 
have a single partition of data, increasing the number of tasks will only 
increase execution time due to higher scheduling overhead. Additionally, how 
large is a single partition in your application relative to the amount of 
memory on the machine? If you are running on a machine with a small amount of 
memory, increasing the number of executors per machine may increase GC/memory 
pressure. On a single node, since your executors share a memory and I/O system, 
you could just thrash everything.

In any case, you can’t normally generalize between increased parallelism on a 
single node and increased parallelism across a cluster. If you are purely 
limited by CPU, then yes, you can normally make that generalization. However, 
when you increase the number of workers in a cluster, you are providing your 
app with more resources (memory capacity and bandwidth, and disk bandwidth). 
When you increase the number of tasks executing on a single node, you do not 
increase the pool of available resources.

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Feb 21, 2015, at 4:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote:

 No, I just have a single node standalone cluster.
 
 I am not tweaking around with the code to increase parallelism. I am just 
 running SparkKMeans that is there in Spark-1.0.0
 I just wanted to know, if this behavior is natural. And if so, what causes 
 this?
 
 Thank you
 
 On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote:
 What's your storage like? are you adding worker machines that are
 remote from where the data lives? I wonder if it just means you are
 spending more and more time sending the data over the network as you
 try to ship more of it to more remote workers.
 
 To answer your question, no in general more workers means more
 parallelism and therefore faster execution. But that depends on a lot
 of things. For example, if your process isn't parallelize to use all
 available execution slots, adding more slots doesn't do anything.
 
 On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com 
 wrote:
  Yes, I am talking about standalone single node cluster.
 
  No, I am not increasing parallelism. I just wanted to know if it is natural.
  Does message passing across the workers account for the happenning?
 
  I am running SparkKMeans, just to validate one prediction model. I am using
  several data sets. I have a standalone mode. I am varying the workers from 1
  to 16
 
  On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote:
 
  I can imagine a few reasons. Adding workers might cause fewer tasks to
  execute locally (?) So you may be execute more remotely.
 
  Are you increasing parallelism? for trivial jobs, chopping them up
  further may cause you to pay more overhead of managing so many small
  tasks, for no speed up in execution time.
 
  Can you provide any more specifics though? you haven't said what
  you're running, what mode, how many workers, how long it takes, etc.
 
  On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com
  wrote:
   Hi,
   I have been running some jobs in my local single node stand alone
   cluster. I
   am varying the worker instances for the same job, and the time taken for
   the
   job to complete increases with increase in the number of workers. I
   repeated
   some experiments varying the number of nodes in a cluster too and the
   same
   behavior is seen.
   Can the idea of worker instances be extrapolated to the nodes in a
   cluster?
  
   Thank You
 
 
 



Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
So, with the increase in the number of worker instances, if I also increase
the degree of parallelism, will it make any difference?
I can use this model even the other way round right? I can always predict
the performance of an app with the increase in number of worker instances,
the deterioration in performance, right?

Thank You

On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Yes, I have decreased the executor memory.
 But,if I have to do this, then I have to tweak around with the code
 corresponding to each configuration right?

 On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I am
 just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time sending the data over the network as you
  try to ship more of it to more remote workers.
 
  To answer your question, no in general more workers means more
  parallelism and therefore faster execution. But that depends on a lot
  of things. For example, if your process isn't parallelize to use all
  available execution slots, adding more slots doesn't do anything.
 
  On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Yes, I am talking about standalone single node cluster.
  
   No, I am not increasing parallelism. I just wanted to know if it is
   natural.
   Does message passing across the workers account for the happenning?
  
   I am running SparkKMeans, just to validate one prediction model. I am
   using
   several data sets. I have a standalone mode. I am varying the workers
   from 1
   to 16
  
   On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com
 wrote:
  
   I can imagine a few reasons. Adding workers might cause fewer tasks
 to
   execute locally (?) So you may be execute more remotely.
  
   Are you increasing parallelism? for trivial jobs, chopping them up
   further may cause you to pay more overhead of managing so many small
   tasks, for no speed up in execution time.
  
   Can you provide any more specifics though? you haven't said what
   you're running, what mode, how many workers, how long it takes, etc.
  
   On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan
   pradhandeep1...@gmail.com
   wrote:
Hi,
I have been running some jobs in my local single node stand alone
cluster. I
am varying the worker instances for the same job, and the time
 taken
for
the
job to complete increases with increase in the number of workers.
 I
repeated
some experiments varying the number of nodes in a cluster too and
 the
same
behavior is seen.
Can the idea of worker instances be extrapolated to the nodes in a
cluster?
   
Thank You
  
  
 
 





Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
Yes, I am talking about standalone single node cluster.

No, I am not increasing parallelism. I just wanted to know if it is
natural. Does message passing across the workers account for the happenning?

I am running SparkKMeans, just to validate one prediction model. I am using
several data sets. I have a standalone mode. I am varying the workers from
1 to 16

On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote:

 I can imagine a few reasons. Adding workers might cause fewer tasks to
 execute locally (?) So you may be execute more remotely.

 Are you increasing parallelism? for trivial jobs, chopping them up
 further may cause you to pay more overhead of managing so many small
 tasks, for no speed up in execution time.

 Can you provide any more specifics though? you haven't said what
 you're running, what mode, how many workers, how long it takes, etc.

 On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  Hi,
  I have been running some jobs in my local single node stand alone
 cluster. I
  am varying the worker instances for the same job, and the time taken for
 the
  job to complete increases with increase in the number of workers. I
 repeated
  some experiments varying the number of nodes in a cluster too and the
 same
  behavior is seen.
  Can the idea of worker instances be extrapolated to the nodes in a
 cluster?
 
  Thank You



Re: Perf Prediction

2015-02-21 Thread Deep Pradhan
No, I am talking about some work parallel to prediction works that are done
on GPUs. Like say, given the data for smaller number of nodes in a Spark
cluster, the prediction needs to be done about the time that the
application would take when we have larger number of nodes.

On Sat, Feb 21, 2015 at 8:22 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you be a bit more specific ?

 Are you asking about performance across Spark releases ?

 Cheers

 On Sat, Feb 21, 2015 at 6:38 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Hi,
 Has some performance prediction work been done on Spark?

 Thank You





Re: Worker and Nodes

2015-02-21 Thread Sean Owen
What's your storage like? are you adding worker machines that are
remote from where the data lives? I wonder if it just means you are
spending more and more time sending the data over the network as you
try to ship more of it to more remote workers.

To answer your question, no in general more workers means more
parallelism and therefore faster execution. But that depends on a lot
of things. For example, if your process isn't parallelize to use all
available execution slots, adding more slots doesn't do anything.

On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com wrote:
 Yes, I am talking about standalone single node cluster.

 No, I am not increasing parallelism. I just wanted to know if it is natural.
 Does message passing across the workers account for the happenning?

 I am running SparkKMeans, just to validate one prediction model. I am using
 several data sets. I have a standalone mode. I am varying the workers from 1
 to 16

 On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote:

 I can imagine a few reasons. Adding workers might cause fewer tasks to
 execute locally (?) So you may be execute more remotely.

 Are you increasing parallelism? for trivial jobs, chopping them up
 further may cause you to pay more overhead of managing so many small
 tasks, for no speed up in execution time.

 Can you provide any more specifics though? you haven't said what
 you're running, what mode, how many workers, how long it takes, etc.

 On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  Hi,
  I have been running some jobs in my local single node stand alone
  cluster. I
  am varying the worker instances for the same job, and the time taken for
  the
  job to complete increases with increase in the number of workers. I
  repeated
  some experiments varying the number of nodes in a cluster too and the
  same
  behavior is seen.
  Can the idea of worker instances be extrapolated to the nodes in a
  cluster?
 
  Thank You



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Query data in Spark RRD

2015-02-21 Thread Ted Yu
Have you looked at
http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
?

Cheers

On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna nikhil.ba...@flipkart.com
wrote:


 Hi.

 My use case is building a realtime monitoring system over
 multi-dimensional data.

 The way I'm planning to go about it is to use Spark Streaming to store
 aggregated count over all dimensions in 10 sec interval.

 Then, from a dashboard, I would be able to specify a query over some
 dimensions, which will need re-aggregation from the already computed job.

 My query is, how can I run dynamic queries over data in schema RDDs?

 --
 Nikhil Bafna



Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
No, I just have a single node standalone cluster.

I am not tweaking around with the code to increase parallelism. I am just
running SparkKMeans that is there in Spark-1.0.0
I just wanted to know, if this behavior is natural. And if so, what causes
this?

Thank you

On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote:

 What's your storage like? are you adding worker machines that are
 remote from where the data lives? I wonder if it just means you are
 spending more and more time sending the data over the network as you
 try to ship more of it to more remote workers.

 To answer your question, no in general more workers means more
 parallelism and therefore faster execution. But that depends on a lot
 of things. For example, if your process isn't parallelize to use all
 available execution slots, adding more slots doesn't do anything.

 On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  Yes, I am talking about standalone single node cluster.
 
  No, I am not increasing parallelism. I just wanted to know if it is
 natural.
  Does message passing across the workers account for the happenning?
 
  I am running SparkKMeans, just to validate one prediction model. I am
 using
  several data sets. I have a standalone mode. I am varying the workers
 from 1
  to 16
 
  On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote:
 
  I can imagine a few reasons. Adding workers might cause fewer tasks to
  execute locally (?) So you may be execute more remotely.
 
  Are you increasing parallelism? for trivial jobs, chopping them up
  further may cause you to pay more overhead of managing so many small
  tasks, for no speed up in execution time.
 
  Can you provide any more specifics though? you haven't said what
  you're running, what mode, how many workers, how long it takes, etc.
 
  On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Hi,
   I have been running some jobs in my local single node stand alone
   cluster. I
   am varying the worker instances for the same job, and the time taken
 for
   the
   job to complete increases with increase in the number of workers. I
   repeated
   some experiments varying the number of nodes in a cluster too and the
   same
   behavior is seen.
   Can the idea of worker instances be extrapolated to the nodes in a
   cluster?
  
   Thank You
 
 



Re: Worker and Nodes

2015-02-21 Thread Sean Owen
Workers has a specific meaning in Spark. You are running many on one
machine? that's possible but not usual.

Each worker's executors have access to a fraction of your machine's
resources then. If you're not increasing parallelism, maybe you're not
actually using additional workers, so are using less resource for your
problem.

Or because the resulting executors are smaller, maybe you're hitting
GC thrashing in these executors with smaller heaps.

Or if you're not actually configuring the executors to use less
memory, maybe you're over-committing your RAM and swapping?

Bottom line, you wouldn't use multiple workers on one small standalone
node. This isn't a good way to estimate performance on a distributed
cluster either.

On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com wrote:
 No, I just have a single node standalone cluster.

 I am not tweaking around with the code to increase parallelism. I am just
 running SparkKMeans that is there in Spark-1.0.0
 I just wanted to know, if this behavior is natural. And if so, what causes
 this?

 Thank you

 On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote:

 What's your storage like? are you adding worker machines that are
 remote from where the data lives? I wonder if it just means you are
 spending more and more time sending the data over the network as you
 try to ship more of it to more remote workers.

 To answer your question, no in general more workers means more
 parallelism and therefore faster execution. But that depends on a lot
 of things. For example, if your process isn't parallelize to use all
 available execution slots, adding more slots doesn't do anything.

 On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  Yes, I am talking about standalone single node cluster.
 
  No, I am not increasing parallelism. I just wanted to know if it is
  natural.
  Does message passing across the workers account for the happenning?
 
  I am running SparkKMeans, just to validate one prediction model. I am
  using
  several data sets. I have a standalone mode. I am varying the workers
  from 1
  to 16
 
  On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com wrote:
 
  I can imagine a few reasons. Adding workers might cause fewer tasks to
  execute locally (?) So you may be execute more remotely.
 
  Are you increasing parallelism? for trivial jobs, chopping them up
  further may cause you to pay more overhead of managing so many small
  tasks, for no speed up in execution time.
 
  Can you provide any more specifics though? you haven't said what
  you're running, what mode, how many workers, how long it takes, etc.
 
  On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan
  pradhandeep1...@gmail.com
  wrote:
   Hi,
   I have been running some jobs in my local single node stand alone
   cluster. I
   am varying the worker instances for the same job, and the time taken
   for
   the
   job to complete increases with increase in the number of workers. I
   repeated
   some experiments varying the number of nodes in a cluster too and the
   same
   behavior is seen.
   Can the idea of worker instances be extrapolated to the nodes in a
   cluster?
  
   Thank You
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
Yes, I have decreased the executor memory.
But,if I have to do this, then I have to tweak around with the code
corresponding to each configuration right?

On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I am just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time sending the data over the network as you
  try to ship more of it to more remote workers.
 
  To answer your question, no in general more workers means more
  parallelism and therefore faster execution. But that depends on a lot
  of things. For example, if your process isn't parallelize to use all
  available execution slots, adding more slots doesn't do anything.
 
  On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Yes, I am talking about standalone single node cluster.
  
   No, I am not increasing parallelism. I just wanted to know if it is
   natural.
   Does message passing across the workers account for the happenning?
  
   I am running SparkKMeans, just to validate one prediction model. I am
   using
   several data sets. I have a standalone mode. I am varying the workers
   from 1
   to 16
  
   On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com
 wrote:
  
   I can imagine a few reasons. Adding workers might cause fewer tasks
 to
   execute locally (?) So you may be execute more remotely.
  
   Are you increasing parallelism? for trivial jobs, chopping them up
   further may cause you to pay more overhead of managing so many small
   tasks, for no speed up in execution time.
  
   Can you provide any more specifics though? you haven't said what
   you're running, what mode, how many workers, how long it takes, etc.
  
   On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan
   pradhandeep1...@gmail.com
   wrote:
Hi,
I have been running some jobs in my local single node stand alone
cluster. I
am varying the worker instances for the same job, and the time
 taken
for
the
job to complete increases with increase in the number of workers. I
repeated
some experiments varying the number of nodes in a cluster too and
 the
same
behavior is seen.
Can the idea of worker instances be extrapolated to the nodes in a
cluster?
   
Thank You
  
  
 
 



Re: java.io.IOException: Filesystem closed

2015-02-21 Thread Kartheek.R
Are you replicating any RDDs?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-Filesystem-closed-tp20150p21749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Worker and Nodes

2015-02-21 Thread Deep Pradhan
So, if I keep the number of instances constant and increase the degree of
parallelism in steps, can I expect the performance to increase?

Thank You

On Sat, Feb 21, 2015 at 9:07 PM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 So, with the increase in the number of worker instances, if I also
 increase the degree of parallelism, will it make any difference?
 I can use this model even the other way round right? I can always predict
 the performance of an app with the increase in number of worker instances,
 the deterioration in performance, right?

 Thank You

 On Sat, Feb 21, 2015 at 8:52 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Yes, I have decreased the executor memory.
 But,if I have to do this, then I have to tweak around with the code
 corresponding to each configuration right?

 On Sat, Feb 21, 2015 at 8:47 PM, Sean Owen so...@cloudera.com wrote:

 Workers has a specific meaning in Spark. You are running many on one
 machine? that's possible but not usual.

 Each worker's executors have access to a fraction of your machine's
 resources then. If you're not increasing parallelism, maybe you're not
 actually using additional workers, so are using less resource for your
 problem.

 Or because the resulting executors are smaller, maybe you're hitting
 GC thrashing in these executors with smaller heaps.

 Or if you're not actually configuring the executors to use less
 memory, maybe you're over-committing your RAM and swapping?

 Bottom line, you wouldn't use multiple workers on one small standalone
 node. This isn't a good way to estimate performance on a distributed
 cluster either.

 On Sat, Feb 21, 2015 at 3:11 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:
  No, I just have a single node standalone cluster.
 
  I am not tweaking around with the code to increase parallelism. I am
 just
  running SparkKMeans that is there in Spark-1.0.0
  I just wanted to know, if this behavior is natural. And if so, what
 causes
  this?
 
  Thank you
 
  On Sat, Feb 21, 2015 at 8:32 PM, Sean Owen so...@cloudera.com wrote:
 
  What's your storage like? are you adding worker machines that are
  remote from where the data lives? I wonder if it just means you are
  spending more and more time sending the data over the network as you
  try to ship more of it to more remote workers.
 
  To answer your question, no in general more workers means more
  parallelism and therefore faster execution. But that depends on a lot
  of things. For example, if your process isn't parallelize to use all
  available execution slots, adding more slots doesn't do anything.
 
  On Sat, Feb 21, 2015 at 2:51 PM, Deep Pradhan 
 pradhandeep1...@gmail.com
  wrote:
   Yes, I am talking about standalone single node cluster.
  
   No, I am not increasing parallelism. I just wanted to know if it is
   natural.
   Does message passing across the workers account for the happenning?
  
   I am running SparkKMeans, just to validate one prediction model. I
 am
   using
   several data sets. I have a standalone mode. I am varying the
 workers
   from 1
   to 16
  
   On Sat, Feb 21, 2015 at 8:14 PM, Sean Owen so...@cloudera.com
 wrote:
  
   I can imagine a few reasons. Adding workers might cause fewer
 tasks to
   execute locally (?) So you may be execute more remotely.
  
   Are you increasing parallelism? for trivial jobs, chopping them up
   further may cause you to pay more overhead of managing so many
 small
   tasks, for no speed up in execution time.
  
   Can you provide any more specifics though? you haven't said what
   you're running, what mode, how many workers, how long it takes,
 etc.
  
   On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan
   pradhandeep1...@gmail.com
   wrote:
Hi,
I have been running some jobs in my local single node stand alone
cluster. I
am varying the worker instances for the same job, and the time
 taken
for
the
job to complete increases with increase in the number of
 workers. I
repeated
some experiments varying the number of nodes in a cluster too
 and the
same
behavior is seen.
Can the idea of worker instances be extrapolated to the nodes in
 a
cluster?
   
Thank You
  
  
 
 






Missing shuffle files

2015-02-21 Thread Anders Arpteg
For large jobs, the following error message is shown that seems to indicate
that shuffle files for some reason are missing. It's a rather large job
with many partitions. If the data size is reduced, the problem disappears.
I'm running a build from Spark master post 1.2 (build at 2015-01-16) and
running on Yarn 2.2. Any idea of how to resolve this problem?

User class threw exception: Job aborted due to stage failure: Task 450 in
stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
java.io.FileNotFoundException:
/disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
(No such file or directory)
 at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.(FileOutputStream.java:221)
 at java.io.FileOutputStream.(FileOutputStream.java:171)
 at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
 at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
 at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
 at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:745)

TIA,
Anders


Re: Missing shuffle files

2015-02-21 Thread Corey Nolet
I'm experiencing the same issue. Upon closer inspection I'm noticing that
executors are being lost as well. Thing is, I can't figure out how they are
dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
allocated for the application. I was thinking perhaps it was possible that
a single executor was getting a single or a couple large partitions but
shouldn't the disk persistence kick in at that point?

On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

 User class threw exception: Job aborted due to stage failure: Task 450 in
 stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

 TIA,
 Anders




Query data in Spark RRD

2015-02-21 Thread Nikhil Bafna
Hi.

My use case is building a realtime monitoring system over multi-dimensional
data.

The way I'm planning to go about it is to use Spark Streaming to store
aggregated count over all dimensions in 10 sec interval.

Then, from a dashboard, I would be able to specify a query over some
dimensions, which will need re-aggregation from the already computed job.

My query is, how can I run dynamic queries over data in schema RDDs?

--
Nikhil Bafna


Perf Prediction

2015-02-21 Thread Deep Pradhan
Hi,
Has some performance prediction work been done on Spark?

Thank You


Re: Worker and Nodes

2015-02-21 Thread Sean Owen
I can imagine a few reasons. Adding workers might cause fewer tasks to
execute locally (?) So you may be execute more remotely.

Are you increasing parallelism? for trivial jobs, chopping them up
further may cause you to pay more overhead of managing so many small
tasks, for no speed up in execution time.

Can you provide any more specifics though? you haven't said what
you're running, what mode, how many workers, how long it takes, etc.

On Sat, Feb 21, 2015 at 2:37 PM, Deep Pradhan pradhandeep1...@gmail.com wrote:
 Hi,
 I have been running some jobs in my local single node stand alone cluster. I
 am varying the worker instances for the same job, and the time taken for the
 job to complete increases with increase in the number of workers. I repeated
 some experiments varying the number of nodes in a cluster too and the same
 behavior is seen.
 Can the idea of worker instances be extrapolated to the nodes in a cluster?

 Thank You

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Worker and Nodes

2015-02-21 Thread Deep Pradhan
Hi,
I have been running some jobs in my local single node stand alone cluster.
I am varying the worker instances for the same job, and the time taken for
the job to complete increases with increase in the number of workers. I
repeated some experiments varying the number of nodes in a cluster too and
the same behavior is seen.
Can the idea of worker instances be extrapolated to the nodes in a cluster?

Thank You


Re: Worker and Nodes

2015-02-21 Thread Yiannis Gkoufas
Hi,

I have experienced the same behavior. You are talking about standalone
cluster mode right?

BR

On 21 February 2015 at 14:37, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I have been running some jobs in my local single node stand alone cluster.
 I am varying the worker instances for the same job, and the time taken for
 the job to complete increases with increase in the number of workers. I
 repeated some experiments varying the number of nodes in a cluster too and
 the same behavior is seen.
 Can the idea of worker instances be extrapolated to the nodes in a cluster?

 Thank You



Re: Posting to the list

2015-02-21 Thread Petar Zecevic


The message went through after all. Sorry for spamming.


On 21.2.2015. 21:27, pzecevic wrote:

Hi Spark users.

Does anybody know what are the steps required to be able to post to this
list by sending an email to user@spark.apache.org? I just sent a reply to
Corey Nolet's mail Missing shuffle files but I don't think it was accepted
by the engine.

If I look at the Spark user list, I don't see this topic (Missing shuffle
files) at all: http://apache-spark-user-list.1001560.n3.nabble.com/

I can see it in the archives, though:
https://mail-archives.apache.org/mod_mbox/spark-user/201502.mbox/browser
but my answer is not there.

This is not the first time this happened and I am wondering what is going
on. The engine is eating my emails? It doesn't like me?
I am subscribed to the list and I have the Nabble account.
I previously saw one of my email marked with This message has not been
accepted by the mailing list yet. I read what that means, but I don't think
it applies to me.

What am I missing?

P.S.: I am posting this through the Nabble web interface. Hope it gets
through...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Posting-to-the-list-tp21750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark performance tuning

2015-02-21 Thread java8964
Can someone share some ideas about how to tune the GC time?
Thanks

From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Spark performance tuning
Date: Fri, 20 Feb 2015 16:04:23 -0500




Hi, 
I am new to Spark, and I am trying to test the Spark SQL performance vs Hive. I 
setup a standalone box, with 24 cores and 64G memory.
We have one SQL in mind to test. Here is the basically setup on this one box 
for the SQL we are trying to run:
1) Dataset 1, 6.6G AVRO file with snappy compression, which contains nest 
structure of 3 array of struct in AVRO2) Dataset2, 5G AVRO file with snappy 
compression3) Dataset3, 2.3M AVRO file with snappy compression.
The basic structure of the query is like this:

(selectxxxfromdataset1 lateral view outer explode(struct1) lateral view outer 
explode(struct2)where x )left outer join(select  from dataset2 lateral 
view explode(xxx) where )on left outer join(select xxx from dataset3 
where )on x
So overall what it does is 2 outer explode on dataset1, left outer join with 
explode of dataset2, then finally left outer join with dataset 3.
On this standalone box, I installed Hadoop 2.2 and Hive 0.12, and Spark 1.2.0.
Baseline, the above query can finish around 50 minutes in Hive 12, with 6 
mappers and 3 reducers, each with 1G max heap, in 3 rounds of MR jobs.
This is a very expensive query running in our production, of course with much 
bigger data set, every day. Now I want to see how fast Spark can do for the 
same query.
I am using the following settings, based on my understanding of Spark, for a 
fair test between it and Hive:
export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2g--executor-memory 9g 
--total-executor-cores 9
I am trying to run the one executor with 9 cores and max 9G heap, to make Spark 
use almost same resource we gave to the MapReduce. Here is the result without 
any additional configuration changes, running under Spark 1.2.0, using 
HiveContext in Spark SQL, to run the exactly same query:
The Spark SQL generated 5 stage of tasks, shown below:4   collect at 
SparkPlan.scala:84 +details  2015/02/20 10:48:46 26 s200/200
 3   mapPartitions at Exchange.scala:64 +details 2015/02/20 10:32:07 16 min  
200/200 1112.3 MB2   mapPartitions at Exchange.scala:64 
+details 2015/02/20 10:22:06 9 min  40/40   4.7 GB  22.2 GB1   
mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 1.9 min 50/50   
6.2 GB  2.8 GB0   mapPartitions at Exchange.scala:64 +details 
2015/02/20 10:22:06 6 s 2/2 2.3 MB  156.6 KB
So the wall time of whole query is 26s + 16m + 9m + 2m + 6s, around 28 minutes.
It is about 56% of originally time, not bad. But I want to know any tuning of 
Spark can make it even faster.
For stage 2 and 3, I observed that GC time is more and more expensive. 
Especially in stage 3, shown below:
For stage 3:Metric  Min 25th percentile Median  75th percentile 
MaxDuration20 s30 s35 s39 s
2.4 minGC Time 9 s 17 s20 s25 s
2.2 minShuffle Write   4.7 MB  4.9 MB  5.2 MB  6.1 MB  
8.3 MB
So in median, the GC time took overall 20s/35s = 57% of time.
First change I made is to add the following line in the 
spark-default.conf:spark.serializer org.apache.spark.serializer.KryoSerializer
My assumption is that using kryoSerializer, instead of default java serialize, 
will lower the memory footprint, should lower the GC pressure during runtime. I 
know the I changed the correct spark-default.conf, because if I were add 
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps in the same file, I will see the GC usage in the stdout 
file. Of course, in this test, I didn't add that, as I want to only make one 
change a time.The result is almost the same, as using standard java serialize. 
The wall time is still 28 minutes, and in stage 3, the GC still took around 50 
to 60% of time, almost same result within min, median to max in stage 3, 
without any noticeable performance gain.
Next, based on my understanding, and for this test, I think the default 
spark.storage.memoryFraction is too high for this query, as there is no reason 
to reserve so much memory for caching data, Because we don't reuse any dataset 
in this one query. So I add this at the end of spark-shell command --conf 
spark.storage.memoryFraction=0.3, as I want to just reserve half of the memory 
for caching data vs first time. Of course, this time, I rollback the first 
change of KryoSerializer.
The result looks like almost the same. The whole query finished around 28s + 
14m + 9.6m + 1.9m + 6s = 27 minutes.
It looks like that Spark is faster than Hive, but is there any steps I can make 
it even faster? Why using KryoSerializer makes no difference? If I want to 
use the same resource as now, anything I can do to speed it up 

Posting to the list

2015-02-21 Thread pzecevic
Hi Spark users.

Does anybody know what are the steps required to be able to post to this
list by sending an email to user@spark.apache.org? I just sent a reply to
Corey Nolet's mail Missing shuffle files but I don't think it was accepted
by the engine.

If I look at the Spark user list, I don't see this topic (Missing shuffle
files) at all: http://apache-spark-user-list.1001560.n3.nabble.com/

I can see it in the archives, though:
https://mail-archives.apache.org/mod_mbox/spark-user/201502.mbox/browser
but my answer is not there.

This is not the first time this happened and I am wondering what is going
on. The engine is eating my emails? It doesn't like me?
I am subscribed to the list and I have the Nabble account.
I previously saw one of my email marked with This message has not been
accepted by the mailing list yet. I read what that means, but I don't think
it applies to me.

What am I missing?

P.S.: I am posting this through the Nabble web interface. Hope it gets
through...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Posting-to-the-list-tp21750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Missing shuffle files

2015-02-21 Thread Petar Zecevic


Could you try to turn on the external shuffle service?

spark.shuffle.service.enable= true


On 21.2.2015. 17:50, Corey Nolet wrote:
I'm experiencing the same issue. Upon closer inspection I'm noticing 
that executors are being lost as well. Thing is, I can't figure out 
how they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 
1.3TB of memory allocated for the application. I was thinking perhaps 
it was possible that a single executor was getting a single or a 
couple large partitions but shouldn't the disk persistence kick in at 
that point?


On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com 
mailto:arp...@spotify.com wrote:


For large jobs, the following error message is shown that seems to
indicate that shuffle files for some reason are missing. It's a
rather large job with many partitions. If the data size is
reduced, the problem disappears. I'm running a build from Spark
master post 1.2 (build at 2015-01-16) and running on Yarn 2.2. Any
idea of how to resolve this problem?

User class threw exception: Job aborted due to stage failure: Task
450 in stage 450.1 failed 4 times, most recent failure: Lost task
450.3 in stage 450.1 (TID 167370,
lon4-hadoopslave-b77.lon4.spotify.net
http://lon4-hadoopslave-b77.lon4.spotify.net):
java.io.FileNotFoundException:

/disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
(No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
at
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

TIA,
Anders






Re: Which OutputCommitter to use for S3?

2015-02-21 Thread Andrew Ash
Josh is that class something you guys would consider open sourcing, or
would you rather the community step up and create an OutputCommitter
implementation optimized for S3?

On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

 We (Databricks) use our own DirectOutputCommitter implementation, which is
 a couple tens of lines of Scala code.  The class would almost entirely be a
 no-op except we took some care to properly handle the _SUCCESS file.

 On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

  I didn’t get any response. It’d be really appreciated if anyone using a
 special OutputCommitter for S3 can comment on this!

  Thanks,
 Mingyu

   From: Mingyu Kim m...@palantir.com
 Date: Monday, February 16, 2015 at 1:15 AM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Which OutputCommitter to use for S3?

   HI all,

  The default OutputCommitter used by RDD, which is FileOutputCommitter,
 seems to require moving files at the commit step, which is not a constant
 operation in S3, as discussed in
 http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E
 https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=.
 People seem to develop their own NullOutputCommitter implementation or use
 DirectFileOutputCommitter (as mentioned in SPARK-3595
 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=),
 but I wanted to check if there is a de facto standard, publicly available
 OutputCommitter to use for S3 in conjunction with Spark.

  Thanks,
 Mingyu