Re: Limit pyspark.daemon threads

2016-03-26 Thread Sven Krasser
up by > implementing flock in the jobs and changing how teardowns of the spark > cluster work as far as failed workers. > > Thanks again, > —Ken > > On Mar 26, 2016, at 4:08 PM, Sven Krasser <kras...@gmail.com> wrote: > > My understanding is that the spark.executor.c

Re: Limit pyspark.daemon threads

2016-03-26 Thread Sven Krasser
d > a way, but that’s why the high load nukes the nodes. I don’t have the > spark.executor.cores set, but will setting that to say, 12 limit the > pyspark threads, or will it just limit the jvm threads? > > Thanks! > Ken > > On Mar 25, 2016, at 9:10 PM, Sven Krasser <kras...@gmail

Re: Testing spark with AWS spot instances

2016-03-25 Thread Sven Krasser
When a spot instance terminates, you lose all data (RDD partitions) stored in the executors that ran on that instance. Spark can recreate the partitions from input data, but if that requires going through multiple preceding shuffles a good chunk of the job will need to be redone. -Sven On Thu,

Re: Limit pyspark.daemon threads

2016-03-25 Thread Sven Krasser
Hey Ken, I also frequently see more pyspark daemons than configured concurrency, often it's a low multiple. (There was an issue pre-1.3.0 that caused this to be quite a bit higher, so make sure you at least have a recent version; see SPARK-5395.) Each pyspark daemon tries to stay below the

Re: pyspark issue

2015-07-27 Thread Sven Krasser
It expects an iterable, and if you iterate over a string, you get the individual characters. Use a list instead: pyfiles=['/path/to/file'] On Mon, Jul 27, 2015 at 2:40 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi, I am running pyspark in windows and I am seeing an error while adding

Re: reduceByKey - add values to a list

2015-06-25 Thread Sven Krasser
Hey Kannappan, First of all, what is the reason for avoiding groupByKey since this is exactly what it is for? If you must use reduceByKey with a one-liner, then take a look at this: lambda a,b: (a if type(a) == list else [a]) + (b if type(b) == list else [b]) In contrast to groupByKey, this

Re: reduceByKey - add values to a list

2015-06-25 Thread Sven Krasser
. and the operation is a associative operation, so minimal shuffle if done via reduceByKey. On Jun 26, 2015, at 12:25 AM, Sven Krasser kras...@gmail.com wrote: Hey Kannappan, First of all, what is the reason for avoiding groupByKey since this is exactly what it is for? If you must use

Re: reduceByKey - add values to a list

2015-06-25 Thread Sven Krasser
On Thu, Jun 25, 2015 at 5:01 PM, Kannappan Sirchabesan buildka...@gmail.com wrote: On Jun 26, 2015, at 12:46 AM, Sven Krasser kras...@gmail.com wrote: In that case the reduceByKey operation will likely not give you any benefit (since you are not aggregating data into smaller values

Re: indexing an RDD [Python]

2015-04-29 Thread Sven Krasser
provide a quick example? Also, I’m not quite sure how this work, but the resulting RDD should be a clone, as I may need to modify the values and preserve the original ones. Thank you, *From:* Sven Krasser [mailto:kras...@gmail.com] *Sent:* Friday, April 24, 2015 5:56 PM

Re: Slower performance when bigger memory?

2015-04-29 Thread Sven Krasser
On Mon, Apr 27, 2015 at 7:36 AM, Shuai Zheng szheng.c...@gmail.com wrote: Thanks. So may I know what is your configuration for more/smaller executors on r3.8xlarge, how big of the memory that you eventually decide to give one executor without impact performance (for example: 64g? ). We're

Re: How to debug Spark on Yarn?

2015-04-24 Thread Sven Krasser
For #1, click on a worker node on the YARN dashboard. From there, Tools-Local logs-Userlogs has the logs for each application, and you can view them by executor even while an application is running. (This is for Hadoop 2.4, things may have changed in 2.6.) -Sven On Thu, Apr 23, 2015 at 6:27 AM,

Re: How to debug Spark on Yarn?

2015-04-24 Thread Sven Krasser
On Fri, Apr 24, 2015 at 11:31 AM, Marcelo Vanzin van...@cloudera.com wrote: Spark 1.3 should have links to the executor logs in the UI while the application is running. Not yet in the history server, though. You're absolutely correct -- didn't notice it until now. This is a great addition!

Re: Slower performance when bigger memory?

2015-04-24 Thread Sven Krasser
FWIW, I ran into a similar issue on r3.8xlarge nodes and opted for more/smaller executors. Another observation was that one large executor results in less overall read throughput from S3 (using Amazon's EMRFS implementation) in case that matters to your application. -Sven On Thu, Apr 23, 2015 at

Re: indexing an RDD [Python]

2015-04-24 Thread Sven Krasser
The solution depends largely on your use case. I assume the index is in the key. In that case, you can make a second RDD out of the list of indices and then use cogroup() on both. If the list of indices is small, just using filter() will work well. If you need to read back a few select values to

Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Sven Krasser
Hey Joe, With the ephemeral HDFS, you get the instance store of your worker nodes. For m3.xlarge that will be two 40 GB SSDs local to each instance, which are very fast. For the persistent HDFS, you get whatever EBS volumes the launch script configured. EBS volumes are always network drives, so

Re: Programmatic Spark 1.2.0 on EMR | S3 filesystem is not working when using

2015-01-30 Thread Sven Krasser
From your stacktrace it appears that the S3 writer tries to write the data to a temp file on the local file system first. Taking a guess, that local directory doesn't exist or you don't have permissions for it. -Sven On Fri, Jan 30, 2015 at 6:44 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com

Re: Define size partitions

2015-01-30 Thread Sven Krasser
You can also use your InputFormat/RecordReader in Spark, e.g. using newAPIHadoopFile. See here: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext . -Sven On Fri, Jan 30, 2015 at 6:50 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I want to process

Snappy Crash

2015-01-28 Thread Sven Krasser
I'm running into a new issue with Snappy causing a crash (using Spark 1.2.0). Did anyone see this before? -Sven 2015-01-28 16:09:35,448 WARN [shuffle-server-1] storage.MemoryStore (Logging.scala:logWarning(71)) - Failed to reserve initial memory threshold of 1024.0 KB for computing block

Re: Index wise most frequently occuring element

2015-01-27 Thread Sven Krasser
Use combineByKey. For top 10 as an example (bottom 10 work similarly): add the element to a list. If the list is larger than 10, delete the smallest elements until size is back to 10. -Sven On Tue, Jan 27, 2015 at 3:35 AM, kundan kumar iitr.kun...@gmail.com wrote: I have a an array of the form

Re: Large number of pyspark.daemon processes

2015-01-27 Thread Sven Krasser
-tabpanel#comment-14294570 ). Any ideas to what coalesce() is doing that triggers the creation of additional workers? On Sat, Jan 24, 2015 at 12:27 AM, Sven Krasser kras...@gmail.com wrote: Hey Davies, Sure thing, it's filed here now: https://issues.apache.org/jira/browse/SPARK-5395 As far

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-01-27 Thread Sven Krasser
Since it's an executor running OOM it doesn't look like a container being killed by YARN to me. As a starting point, can you repartition your job into smaller tasks? -Sven On Tue, Jan 27, 2015 at 2:34 PM, Guru Medasani gdm...@outlook.com wrote: Hi Anthony, What is the setting of the total

Re: Large number of pyspark.daemon processes

2015-01-24 Thread Sven Krasser
. Thank you! -Sven On Fri, Jan 23, 2015 at 11:52 PM, Davies Liu dav...@databricks.com wrote: It should be a bug, the Python worker did not exit normally, could you file a JIRA for this? Also, could you show how to reproduce this behavior? On Fri, Jan 23, 2015 at 11:45 PM, Sven Krasser kras

Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with

Re: Problems saving a large RDD (1 TB) to S3 as a sequence file

2015-01-23 Thread Sven Krasser
Hey Darin, Are you running this over EMR or as a standalone cluster? I've had occasional success in similar cases by digging through all executor logs and trying to find exceptions that are not caused by the application shutdown (but the logs remain my main pain point with Spark). That aside,

Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors

Re: Large number of pyspark.daemon processes

2015-01-23 Thread Sven Krasser
Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where

Re: Manually trigger RDD map function without action

2015-01-12 Thread Sven Krasser
Hey Kevin, I assume you want to trigger the map() for a side effect (since you don't care about the result). To Cody's point, you can use foreach() *instead* of map(). So instead of e.g. x.map(a = foo(a)).foreach(a = a), you'd run x.foreach(a = foo(a)). Best, -Sven On Mon, Jan 12, 2015 at 5:13

Re: quickly counting the number of rows in a partition?

2015-01-12 Thread Sven Krasser
Yes, using mapPartitionsWithIndex, e.g. in PySpark: sc.parallelize(xrange(0,1000), 4).mapPartitionsWithIndex(lambda idx,iter: ((idx, len(list(iter))),)).collect() [(0, 250), (1, 250), (2, 250), (3, 250)] (This is not the most efficient way to get the length of an iterator, but you get the

Re: OOM exception during row deserialization

2015-01-12 Thread Sven Krasser
Hey Pala, I also find it very hard to get to the bottom of memory issues such as this one based on what's in the logs (so if you come up with some findings, then please share here). In the interim, here are a few things you can try: - Provision more memory per executor. While in theory (and

Re: Trouble with large Yarn job

2015-01-12 Thread Sven Krasser
Anders, This could be related to this open ticket: https://issues.apache.org/jira/browse/SPARK-5077. A call to coalesce() also fixed that for us as a stopgap. Best, -Sven On Mon, Jan 12, 2015 at 10:18 AM, Anders Arpteg arp...@spotify.com wrote: Yes sure Sandy, I've checked the logs and it's

Re: can I buffer flatMap input at each worker node?

2015-01-12 Thread Sven Krasser
Not sure I understand correctly, but it sounds like you're looking for mapPartitions(). -Sven On Mon, Jan 12, 2015 at 10:17 AM, maherrt mahe...@hotmail.com wrote: Dear All what i want to do is : as the data is partitioned on many worker nodes I want to be able to process this partition of

Re: Shuffle Problems in 1.2.0

2015-01-07 Thread Sven Krasser
, but in standalone mode, it worked fine. Could you try to narrow down the possible change of configurations? Davies On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser kras...@gmail.com wrote: Hey Davies, Here are some more details on a configuration that causes this error for me. Launch an AWS

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Sven Krasser
, no cache needed anymore) Davies On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote: The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Sven Krasser
had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote: Thanks for the input! I've managed to come up with a repro of the error with test data only

Re: Shuffle Problems in 1.2.0

2015-01-05 Thread Sven Krasser
: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune

Re: Spark or Tachyon: capture data lineage

2015-01-02 Thread Sven Krasser
Agreed with Jerry. Aside from Tachyon, seeing this for general debugging would be very helpful. Haoyuan, is that feature you are referring to related to https://issues.apache.org/jira/browse/SPARK-975? In the interim, I've found the toDebugString() method useful (but it renders execution as a

Shuffle Problems in 1.2.0

2014-12-30 Thread Sven Krasser
Hey all, Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails during shuffle. I've tried reverting from the sort-based shuffle back to the hash one, and that fails as well. Does anyone see similar problems or has an idea on where to look next? For the sort-based shuffle I get a

Re: S3 files , Spark job hungsup

2014-12-30 Thread Sven Krasser
This here may also be of help: http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html. Make sure to spread your objects across multiple partitions to not be rate limited by S3. -Sven On Mon, Dec 22, 2014 at 10:20 AM, durga katakam durgak...@gmail.com wrote: Yes . I

Re: Shuffle Problems in 1.2.0

2014-12-30 Thread Sven Krasser
, Sven Krasser kras...@gmail.com wrote: Hey all, Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails during shuffle. I've tried reverting from the sort-based shuffle back to the hash one, and that fails as well. Does anyone see similar problems or has an idea on where