I am not sure what do you mean that "table" is comprised of 200/1200 partitions.
A partition could mean the dataset(RDD/DataFrame) will be chunked within Spark, then processed; Or it could mean you define the metadata in the Hive of the partitions of the table. If you mean the first one, so you control the number of partitions by 'spark.sql.shuffle.partitions', which has the default value of 200. I will be surprised that a query works with default 200, but fails with the new value you set as 1200. As in general, when you increase this value, you force more partitions in your DF, which will lead less data per partition. So if you overset this value, it will hurt your performance, but should fail your job, if you can run the same job with less configured value. Yong ________________________________ From: Joseph Naegele <jnaeg...@grierforensics.com> Sent: Friday, January 6, 2017 1:14 PM To: 'user' Subject: Spark SQL 1.6.3 ORDER BY and partitions I have two separate but similar issues that I've narrowed down to a pretty good level of detail. I'm using Spark 1.6.3, particularly Spark SQL. I'm concerned with a single dataset for now, although the details apply to other, larger datasets. I'll call it "table". It's around 160 M records, average of 78 bytes each, so about 12 GB uncompressed. It's 2 GB compressed in HDFS. First issue: The following query works if "table" is comprised of 200 partitions (on disk), but fails when "table" is 1200 partitions with the "Total size of serialized results of 1031 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)" error: SELECT * FROM orc.`table` ORDER BY field DESC LIMIT 100000; This is possibly related to the TakeOrderedAndProject step in the execution plan, because the following queries do not give me problems: SELECT * FROM orc.`table`; SELECT * FROM orc.`table` ORDER BY field DESC; SELECT * FROM orc.`table` LIMIT 100000; All of which have different execution plans. My "table" has 1200 partitions because I must use a large value for spark.sql.shuffle.partitions to handle joins and window functions on much larger DataFrames in my application. Too many partitions may be suboptimal, but it shouldn't lead to large serialized results, correct? Any ideas? I've seen https://issues.apache.org/jira/browse/SPARK-12837, but I think my issue is a bit more specific. Second issue: The difference between execution when calling .cache() and .count() on the following two DataFrames: A: sqlContext.sql("SELECT * FROM table") B: sqlContext.sql("SELECT * FROM table ORDER BY field DESC") Counting the rows of A works as expected. A single Spark job with 2 stages. Load from Hadoop, map, aggregate, reduce to a number. The same can't be said for B, however. The .cache() call spawns a Spark job before I even call .count(), loading from HDFS and performing ConvertToSafe and Exchange. The .count() call spawns another job, the first task of which appears to re-load from HDFS and again perform ConvertToSafe and Exchange, writing 1200 shuffle partitions. The next stage then proceeds to read the shuffle data across only 2 tasks. One of these tasks completes immediately and the other runs indefinitely, failing because the partition is too large (the java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE error). Does this behavior make sense at all? Obviously it doesn't make sense to sort rows if I'm just counting them, but this is a simplified example of a more complex application in which caching makes sense. My executors have more than enough memory to cache this entire DataFrame. Thanks for reading --- Joe Naegele Grier Forensics --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org