Re: Spark DataFrames uses too many partition

2015-08-12 Thread Al M
The DataFrames parallelism currently controlled through configuration option
spark.sql.shuffle.partitions.  The default value is 200

I have raised an Improvement Jira to make it possible to specify the number
of partitions in https://issues.apache.org/jira/browse/SPARK-9872



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214p24223.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Al M
Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
still puts everything into the same partition.

We did manage to improve the situation by making a new partitioner that
extends HashPartitioner.  It treats certain exception keys differently. 
These keys that are known to appear very often are assigned random
partitions instead of using the existing partitioning mechanism.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Limit Spark Shuffle Disk Usage

2015-06-17 Thread Al M
Thanks Himanshu and RahulKumar!

The databricks forum post was extremely useful.  It is great to see an
article that clearly details how and when shuffles are cleaned up.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279p23359.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle produces one huge partition

2015-06-17 Thread Al M
I have 2 RDDs I want to Join.  We will call them RDD A and RDD B.  RDD A has
1 billion rows; RDD B has 100k rows.  I want to join them on a single key.

95% of the rows in RDD A have the same key to join with RDD B.  Before I can
join the two RDDs, I must map them to tuples where the first element is the
key and the second is the value.

Since 95% of the rows in RDD A have the same key, they now go into the same
partition.  When I perform the join, the system will try to execute this
partition in just one task.  This one task will try to load too much data
into memory at once and die a horrible death.

I know that this is caused by the HashPartitioner that is used by default in
Spark; everything with the same hashed key goes into the same partition.  I
also tried the RangePartitioner but still saw 95% of the data go into the
same partition.  What I'd really like is a partitioner that puts everything
with the same key into the same partition *except* when the partition is
over a certain size, then it would just spill into the next partition.

Writing my own partitioner is a big job, and requires a lot of testing to
make sure I get it right.  Is there a simpler way to solve this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-tp23358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Limit Spark Shuffle Disk Usage

2015-06-11 Thread Al M
I am using Spark on a machine with limited disk space.  I am using it to
analyze very large (100GB to 1TB per file) data sets stored in HDFS.  When I
analyze these datasets, I will run groups, joins and cogroups.  All of these
operations mean lots of shuffle files written to disk.  

Unfortunately what happens is my disk fills up very quickly (I only have
40GB free).  Then my process dies because I don't have enough space on disk. 
I don't want to write my shuffles to HDFS because it's already pretty full. 
The shuffle files are cleared up between runs, but this doesnt help when a
single run requires 300GB+ shuffle disk space.

Is there any way that I can limit the amount of disk space used by my
shuffles?  I could set up a cron job to delete old shuffle files whilst the
job is still running, but I'm concerned that they are left there for a good
reason.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Driver Host under Yarn

2015-02-09 Thread Al M
Yarn-cluster.  When i run in yarn-client the driver is just run on the
machine that runs spark-submit.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536p21558.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: getting error when submit spark with master as yarn

2015-02-09 Thread Al M
Open up 'yarn-site.xml' in your hadoop configuration.  You want to create
configuration for yarn.nodemanager.resource.memory-mb and
yarn.scheduler.maximum-allocation-mb.  Have a look here for details on how
they work:
https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getting-error-when-submit-spark-with-master-as-yarn-tp21542p21547.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Driver Host under Yarn

2015-02-06 Thread Al M
I'm running Spark 1.2 with Yarn.  My logs show that my executors are failing
to connect to my driver.  This is because they are using the wrong hostname.

Since I'm running with Yarn, I can't set spark.driver.host as explained in
SPARK-4253.  So it should come from my HDFS configuration.  Do you know
which piece of HDFS configuration determines my driver hostname?  

It's definitely not using the hostname i have in
yarn-site.xml:yarn.iresourcemanager.hostname or
core-site.xml:fs.default.name.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Testing if an RDD is empty?

2015-01-15 Thread Al M
You can also check rdd.partitions.size.  This will be 0 for empty RDDs and 
0 for RDDs with data.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Testing-if-an-RDD-is-empty-tp1678p21170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.2 Release Date

2014-12-18 Thread Al M
Is there a planned release date for Spark 1.2?  I saw on the  Spark Wiki
https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage   that we
are already in the latter part of the release window.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 Release Date

2014-12-18 Thread Al M
Awesome.  Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20767.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Failed fetch: Could not get block(s)

2014-12-03 Thread Al M
I am using Spark 1.1.1.  I am seeing an issue that only appears when I run in
standalone clustered mode with at least 2 workers.  The workers are on
separate physical machines.
I am performing a simple join on 2 RDDs.  After the join I run first() on
the joined RDD (in Scala) to get the first result.  When this first() runs
on Worker A it works fine; when the first() runs on worker B I get an error
'Fetch Failure'.
I looked at the work stderr log for worker B.  It shows the following
exception:
INFO BlockFetcherInterator$BasicBlockFetcherIterator: Started 2 remote
fetches in 2 msERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could
not get block(s) from ConnectionManagerId(, )java.io.IOException:
sendMessageReliably failed because ack was not received within 60 secat
org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866).
It is trying to connect to the ConnectionManager for BlockManager on Worker
A from Worker B.  It manages to connect, but it always times out.  When I
try to connect via telnet I see the same: it connects, but I don't get
anything back from the host
I noticed that two other people reported  this issue
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Freezing-while-running-TPC-H-query-5-td14902.html
 
.  Unfortunately there was no meaningful progress.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Failed-fetch-Could-not-get-block-s-tp20262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.