[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280271#comment-16280271 ] Darren Govoni commented on SPARK-17788: --- I'm also running into this error on spark 2.1.0 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 11.0 failed 4 times, most recent failure: Lost task 42.3 in stage 11.0 (TID 7544, bdr-itwp-hdfs-2.dev.uspto.gov, executor 2): java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour >Assignee: Wenchen Fan > Fix For: 2.3.0 > > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16225316#comment-16225316 ] Wenchen Fan commented on SPARK-17788: - Unfortunately I don't have a reproducible code snippet to prove it has been fixed, but I'm pretty confident my fix should work for it. cc [~babak.alip...@gmail.com] please reopen this ticket if you still hit this issue, thanks! > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour >Assignee: Wenchen Fan > Fix For: 2.3.0 > > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16223837#comment-16223837 ] Apache Spark commented on SPARK-17788: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/18251 > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15700440#comment-15700440 ] Herman van Hovell commented on SPARK-17788: --- [~babak.alip...@gmail.com] A few questions: - Is it possible to get a reproducible piece of code? - Could you give us the value of the {{spark.buffer.pageSize}} configuration property? When we allocate the memory for a new record we try to allocate either the page size (which is a Long value) or the size of the record (which is an Int value). The size of the page is larger than the maximum integer value, so this implies the page size is set at a very high value. - I am also quite surprised why this is not spilling. Could you give us the value of the {{spark.shuffle.spill.numElementsForceSpillThreshold}} configuration property? What is the average row size? > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15700013#comment-15700013 ] Babak Alipour commented on SPARK-17788: --- No, I didn't change that conf. I did try to change `spark.executor.memory` to various values ranging from 8g to 64g; nothing changes and I get the same exception. > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15699276#comment-15699276 ] Wenchen Fan commented on SPARK-17788: - After looking at the code, it seems the only way to trigger this exception is setting `spark.buffer.pageSize` to a value larger than `((1L << 31) - 1) * 8L`, [~babak.alip...@gmail.com] did you set this conf? > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15698037#comment-15698037 ] Wenchen Fan commented on SPARK-17788: - Should we investigate this? {code} Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes {code} Although some partitions can be very large, but Spark should be able to process it(slowly), instead of throwing exception. > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696321#comment-15696321 ] Herman van Hovell commented on SPARK-17788: --- That is fair. The solution is not that straightforward TBH: - Always add some kind of tie breaking value to the range. This could be random, but I'd rather add something like monotonically_increasing_id(). This always incurs some cost. - Only add a tie-breaker when the you have (suspect) skew. Here we need to add some heavy hitter algorithm, which is potentially much more resource intensive than reservoir sampling. The other thing is that when we suspect skew, we would need to scan the data again (which would make the total of scans 3). So I would be slightly in favor of option 1 and a flag to disable it. > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696188#comment-15696188 ] holdenk commented on SPARK-17788: - I don't think this is a duplicate - its related but a join doesn't necessarily use a range partitioner and sortBy is a different operation. I agree the potential solution could share a lot the same underlying implementation. > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696154#comment-15696154 ] Herman van Hovell commented on SPARK-17788: --- I am closing this one as a duplicate. Feel free to reopen if you disagree. > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15696134#comment-15696134 ] Herman van Hovell commented on SPARK-17788: --- Spark makes a sketch of your data as soon when you want to order the entire dataset. Based on that sketch Spark tries to create equally sized partitions. As [~holdenk]] said, your problem is caused by skew (a lot of rows with the same key), and none of the current partitioning schemes can help you with this. On the short run, you could follow her suggestion and add noise to the order (this only works for global ordering and not for joins/aggregation with skewed values). On the long run, there is an ongoing effort to reduce skew for joining, see SPARK-9862 for more information. I have creates the follow little spark program to illustrate how range partitioning works: {noformat} import org.apache.spark.sql.Row // Set the partitions and parallelism to relatively low value so we can read the results. spark.conf.set("spark.default.parallelism", "20") spark.conf.set("spark.sql.shuffle.partitions", "20") // Create a skewed data frame. val df = spark .range(1000) .select( $"id", (rand(34) * when($"id" % 10 <= 7, lit(1.0)).otherwise(lit(10.0))).as("value")) // Make a summary per partition. The partition intervals should not overlap and the number of // elements in a partition should roughly be the same for all partitions. case class PartitionSummary(count: Long, min: Double, max: Double, range: Double) val res = df.orderBy($"value").mapPartitions { iterator => val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, Double.NegativeInfinity)) { case ((count, min, max), Row(_, value: Double)) => (count + 1L, Math.min(min, value), Math.max(max, value)) } Iterator.single(PartitionSummary(count, min, max, max - min)) } // Get results and make them look nice res.orderBy($"min") .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), $"range".cast("decimal(5,3)")) .show(30) {noformat} This yields the following results (notice how the partition range varies and the row count is relatively similar): {noformat} +--+-+--+-+ | count| min| max|range| +--+-+--+-+ |484005|0.000| 0.059|0.059| |426212|0.059| 0.111|0.052| |381796|0.111| 0.157|0.047| |519954|0.157| 0.221|0.063| |496842|0.221| 0.281|0.061| |539082|0.281| 0.347|0.066| |516798|0.347| 0.410|0.063| |558487|0.410| 0.478|0.068| |419825|0.478| 0.529|0.051| |402257|0.529| 0.578|0.049| |557225|0.578| 0.646|0.068| |518626|0.646| 0.710|0.063| |611478|0.710| 0.784|0.075| |544556|0.784| 0.851|0.066| |454356|0.851| 0.906|0.055| |450535|0.906| 0.961|0.055| |575996|0.961| 2.290|1.329| |525915|2.290| 4.920|2.630| |518757|4.920| 7.510|2.590| |497298|7.510|10.000|2.490| +--+-+--+-+ {noformat} > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15695956#comment-15695956 ] holdenk commented on SPARK-17788: - This is semi-expected behaviour of the range partitioner (and really all Spark partitioners) don't support creating a split on the same key (e.g. 70% of your data has the same key and you are partitioning on that key 70% of that day is going to end up in the same partition). We could try and fix this in a few ways - either by having Spark SQL do something special in this case or having Spark's sortBy automatically add "noise" to the key when the sampling indicates there is too much data for a given key or allowing partitioners to be non-determinstic and updating the general sortBy logic in Spark. I think this would be something good for us to consider - but it's probably going to take awhile (and certainly not in time for 2.1.0). > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15664035#comment-15664035 ] Babak Alipour commented on SPARK-17788: --- The details were in the email thread. Here's the full stack trace: Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17788) RangePartitioner results in few very large tasks and many small to empty tasks
[ https://issues.apache.org/jira/browse/SPARK-17788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656549#comment-15656549 ] Wenchen Fan commented on SPARK-17788: - can you provide the full stacktrace? thanks! > RangePartitioner results in few very large tasks and many small to empty > tasks > --- > > Key: SPARK-17788 > URL: https://issues.apache.org/jira/browse/SPARK-17788 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.0.0 > Environment: Ubuntu 14.04 64bit > Java 1.8.0_101 >Reporter: Babak Alipour > > Greetings everyone, > I was trying to read a single field of a Hive table stored as Parquet in > Spark (~140GB for the entire table, this single field is a Double, ~1.4B > records) and look at the sorted output using the following: > sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC") > But this simple line of code gives: > Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with > more than 17179869176 bytes > Same error for: > sql("SELECT " + field + " FROM MY_TABLE).sort(field) > and: > sql("SELECT " + field + " FROM MY_TABLE).orderBy(field) > After doing some searching, the issue seems to lie in the RangePartitioner > trying to create equal ranges. [1] > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html > > The Double values I'm trying to sort are mostly in the range [0,1] (~70% of > the data which roughly equates 1 billion records), other numbers in the > dataset are as high as 2000. With the RangePartitioner trying to create equal > ranges, some tasks are becoming almost empty while others are extremely > large, due to the heavily skewed distribution. > This is either a bug in Apache Spark or a major limitation of the framework. > I hope one of the devs can help solve this issue. > P.S. Email thread on Spark user mailing list: > http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org