Re: Spark DataFrames uses too many partition
The DataFrames parallelism currently controlled through configuration option spark.sql.shuffle.partitions. The default value is 200 I have raised an Improvement Jira to make it possible to specify the number of partitions in https://issues.apache.org/jira/browse/SPARK-9872 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214p24223.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle produces one huge partition and many tiny partitions
Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Limit Spark Shuffle Disk Usage
Thanks Himanshu and RahulKumar! The databricks forum post was extremely useful. It is great to see an article that clearly details how and when shuffles are cleaned up. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279p23359.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle produces one huge partition
I have 2 RDDs I want to Join. We will call them RDD A and RDD B. RDD A has 1 billion rows; RDD B has 100k rows. I want to join them on a single key. 95% of the rows in RDD A have the same key to join with RDD B. Before I can join the two RDDs, I must map them to tuples where the first element is the key and the second is the value. Since 95% of the rows in RDD A have the same key, they now go into the same partition. When I perform the join, the system will try to execute this partition in just one task. This one task will try to load too much data into memory at once and die a horrible death. I know that this is caused by the HashPartitioner that is used by default in Spark; everything with the same hashed key goes into the same partition. I also tried the RangePartitioner but still saw 95% of the data go into the same partition. What I'd really like is a partitioner that puts everything with the same key into the same partition *except* when the partition is over a certain size, then it would just spill into the next partition. Writing my own partitioner is a big job, and requires a lot of testing to make sure I get it right. Is there a simpler way to solve this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-tp23358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Limit Spark Shuffle Disk Usage
I am using Spark on a machine with limited disk space. I am using it to analyze very large (100GB to 1TB per file) data sets stored in HDFS. When I analyze these datasets, I will run groups, joins and cogroups. All of these operations mean lots of shuffle files written to disk. Unfortunately what happens is my disk fills up very quickly (I only have 40GB free). Then my process dies because I don't have enough space on disk. I don't want to write my shuffles to HDFS because it's already pretty full. The shuffle files are cleared up between runs, but this doesnt help when a single run requires 300GB+ shuffle disk space. Is there any way that I can limit the amount of disk space used by my shuffles? I could set up a cron job to delete old shuffle files whilst the job is still running, but I'm concerned that they are left there for a good reason. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Driver Host under Yarn
Yarn-cluster. When i run in yarn-client the driver is just run on the machine that runs spark-submit. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536p21558.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: getting error when submit spark with master as yarn
Open up 'yarn-site.xml' in your hadoop configuration. You want to create configuration for yarn.nodemanager.resource.memory-mb and yarn.scheduler.maximum-allocation-mb. Have a look here for details on how they work: https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-error-when-submit-spark-with-master-as-yarn-tp21542p21547.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Driver Host under Yarn
I'm running Spark 1.2 with Yarn. My logs show that my executors are failing to connect to my driver. This is because they are using the wrong hostname. Since I'm running with Yarn, I can't set spark.driver.host as explained in SPARK-4253. So it should come from my HDFS configuration. Do you know which piece of HDFS configuration determines my driver hostname? It's definitely not using the hostname i have in yarn-site.xml:yarn.iresourcemanager.hostname or core-site.xml:fs.default.name. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Testing if an RDD is empty?
You can also check rdd.partitions.size. This will be 0 for empty RDDs and 0 for RDDs with data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Testing-if-an-RDD-is-empty-tp1678p21170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.2 Release Date
Is there a planned release date for Spark 1.2? I saw on the Spark Wiki https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage that we are already in the latter part of the release window. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.2 Release Date
Awesome. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20767.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Failed fetch: Could not get block(s)
I am using Spark 1.1.1. I am seeing an issue that only appears when I run in standalone clustered mode with at least 2 workers. The workers are on separate physical machines. I am performing a simple join on 2 RDDs. After the join I run first() on the joined RDD (in Scala) to get the first result. When this first() runs on Worker A it works fine; when the first() runs on worker B I get an error 'Fetch Failure'. I looked at the work stderr log for worker B. It shows the following exception: INFO BlockFetcherInterator$BasicBlockFetcherIterator: Started 2 remote fetches in 2 msERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(, )java.io.IOException: sendMessageReliably failed because ack was not received within 60 secat org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866). It is trying to connect to the ConnectionManager for BlockManager on Worker A from Worker B. It manages to connect, but it always times out. When I try to connect via telnet I see the same: it connects, but I don't get anything back from the host I noticed that two other people reported this issue http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Freezing-while-running-TPC-H-query-5-td14902.html . Unfortunately there was no meaningful progress. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-fetch-Could-not-get-block-s-tp20262.html Sent from the Apache Spark User List mailing list archive at Nabble.com.