I am not sure what do you mean that "table" is comprised of 200/1200 partitions.


A partition could mean the dataset(RDD/DataFrame) will be chunked within Spark, 
then processed; Or it could mean you define the metadata in the Hive of the 
partitions of the table.

If you mean the first one, so you control the number of partitions by 
'spark.sql.shuffle.partitions', which has the default value of 200.

I will be surprised that a query works with default 200, but fails with the new 
value you set as 1200. As in general, when you increase this value, you force 
more partitions in your DF, which will lead less data per partition. So if you 
overset this value, it will hurt your performance, but should fail your job, if 
you can run the same job with less configured value.

Yong

________________________________
From: Joseph Naegele <jnaeg...@grierforensics.com>
Sent: Friday, January 6, 2017 1:14 PM
To: 'user'
Subject: Spark SQL 1.6.3 ORDER BY and partitions

I have two separate but similar issues that I've narrowed down to a pretty good 
level of detail. I'm using Spark 1.6.3, particularly Spark SQL.

I'm concerned with a single dataset for now, although the details apply to 
other, larger datasets. I'll call it "table". It's around 160 M records, 
average of 78 bytes each, so about 12 GB uncompressed. It's 2 GB compressed in 
HDFS.

First issue:
The following query works if "table" is comprised of 200 partitions (on disk), 
but fails when "table" is 1200 partitions with the "Total size of serialized 
results of 1031 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 
GB)" error:

SELECT * FROM orc.`table` ORDER BY field DESC LIMIT 100000;

This is possibly related to the TakeOrderedAndProject step in the execution 
plan, because the following queries do not give me problems:

SELECT * FROM orc.`table`;
SELECT * FROM orc.`table` ORDER BY field DESC;
SELECT * FROM orc.`table` LIMIT 100000;

All of which have different execution plans.
My "table" has 1200 partitions because I must use a large value for 
spark.sql.shuffle.partitions to handle joins and window functions on much 
larger DataFrames in my application. Too many partitions may be suboptimal, but 
it shouldn't lead to large serialized results, correct?

Any ideas? I've seen https://issues.apache.org/jira/browse/SPARK-12837, but I 
think my issue is a bit more specific.


Second issue:
The difference between execution when calling .cache() and .count() on the 
following two DataFrames:

A: sqlContext.sql("SELECT * FROM table")
B: sqlContext.sql("SELECT * FROM table ORDER BY field DESC")

Counting the rows of A works as expected. A single Spark job with 2 stages. 
Load from Hadoop, map, aggregate, reduce to a number.

The same can't be said for B, however. The .cache() call spawns a Spark job 
before I even call .count(), loading from HDFS and performing ConvertToSafe and 
Exchange. The .count() call spawns another job, the first task of which appears 
to re-load from HDFS and again perform ConvertToSafe and Exchange, writing 1200 
shuffle partitions. The next stage then proceeds to read the shuffle data 
across only 2 tasks. One of these tasks completes immediately and the other 
runs indefinitely, failing because the partition is too large (the 
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE error).

Does this behavior make sense at all? Obviously it doesn't make sense to sort 
rows if I'm just counting them, but this is a simplified example of a more 
complex application in which caching makes sense. My executors have more than 
enough memory to cache this entire DataFrame.

Thanks for reading

---
Joe Naegele
Grier Forensics



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to