You can get some more insights by using the Spark history server (http://spark.apache.org/docs/latest/monitoring.html), it can show you which task is failing and some other information that might help you debugging the issue.
On 05/10/2016 19:00, Babak Alipour wrote: > The issue seems to lie in the RangePartitioner trying to create equal > ranges. [1] > > [1] > https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.ml > <https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html> > > > The /Double/ values I'm trying to sort are mostly in the range [0,1] > (~70% of the data which roughly equates 1 billion records), other > numbers in the dataset are as high as 2000. With the RangePartitioner > trying to create equal ranges, some tasks are becoming almost empty > while others are extremely large, due to the heavily skewed distribution. > > This is either a bug in Apache Spark or a major limitation of the > framework. Has anyone else encountered this? > > */Babak Alipour ,/* > */University of Florida/* > > On Sun, Oct 2, 2016 at 1:38 PM, Babak Alipour <babak.alip...@gmail.com > <mailto:babak.alip...@gmail.com>> wrote: > > Thanks Vadim for sharing your experience, but I have tried > multi-JVM setup (2 workers), various sizes for > spark.executor.memory (8g, 16g, 20g, 32g, 64g) and > spark.executor.core (2-4), same error all along. > > As for the files, these are all .snappy.parquet files, resulting > from inserting some data from other tables. None of them actually > exceeds 25MiB (I don't know why this number) Setting the DataFrame > to persist using StorageLevel.MEMORY_ONLY shows size in memory at > ~10g. I still cannot understand why it is trying to create such a > big page when sorting. The entire column (this df has only 1 > column) is not that big, neither are the original files. Any ideas? > > > >Babak > > > > */Babak Alipour ,/* > */University of Florida/* > > On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov > <vadim.seme...@datadoghq.com <mailto:vadim.seme...@datadoghq.com>> > wrote: > > oh, and try to run even smaller executors, i.e. with > `spark.executor.memory` <= 16GiB. I wonder what result you're > going to get. > > On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov > <vadim.seme...@datadoghq.com > <mailto:vadim.seme...@datadoghq.com>> wrote: > > > Do you mean running a multi-JVM 'cluster' on the single > machine? > Yes, that's what I suggested. > > You can get some information here: > > http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ > > <http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/> > > > How would that affect performance/memory-consumption? If > a multi-JVM setup can handle such a large input, then why > can't a single-JVM break down the job into smaller tasks? > I don't have an answer to these questions, it requires > understanding of Spark, JVM, and your setup internal. > > I ran into the same issue only once when I tried to read a > gzipped file which size was >16GiB. That's the only time I > had to meet > this > https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243 > > <https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243> > In the end I had to recompress my file into bzip2 that is > splittable to be able to read it with spark. > > > I'd look into size of your files and if they're huge I'd > try to connect the error you got to the size of the files > (but it's strange to me as a block size of a Parquet file > is 128MiB). I don't have any other suggestions, I'm sorry. > > > On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour > <babak.alip...@gmail.com <mailto:babak.alip...@gmail.com>> > wrote: > > Do you mean running a multi-JVM 'cluster' on the > single machine? How would that affect > performance/memory-consumption? If a multi-JVM setup > can handle such a large input, then why can't a > single-JVM break down the job into smaller tasks? > > I also found that SPARK-9411 mentions making the > page_size configurable but it's hard-limited > to ((1L<<31) -1) *8L [1] > > [1] > > https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java > > <https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java> > > > Spark-9452 also talks about larger page sizes but I > don't know how that affects my use case. [2] > > [2] https://github.com/apache/spark/pull/7891 > <https://github.com/apache/spark/pull/7891> > > > The reason provided here is that the on-heap > allocator's maximum page size is limited by the > maximum amount of data that can be stored in a long[]. > Is it possible to force this specific operation to go > off-heap so that it can possibly use a bigger page size? > > > > >Babak > > > */Babak Alipour ,/* > */University of Florida/* > > On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov > <vadim.seme...@datadoghq.com > <mailto:vadim.seme...@datadoghq.com>> wrote: > > Run more smaller executors: change > `spark.executor.memory` to 32g and > `spark.executor.cores` to 2-4, for example. > > Changing driver's memory won't help because it > doesn't participate in execution. > > On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour > <babak.alip...@gmail.com > <mailto:babak.alip...@gmail.com>> wrote: > > Thank you for your replies. > > @Mich, using LIMIT 100 in the query prevents > the exception but given the fact that there's > enough memory, I don't think this should > happen even without LIMIT. > > @Vadim, here's the full stack trace: > > Caused by: java.lang.IllegalArgumentException: > Cannot allocate a page with more than > 17179869176 <tel:17179869176> bytes > at > > org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241) > at > > org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121) > at > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374) > at > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) > at > > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94) > at > > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown > Source) > at > > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at > org.apache.spark.scheduler.Task.run(Task.scala:85) > at > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > I'm running spark in local mode so there is > only one executor, the driver > and spark.driver.memory is set to 64g. > Changing the driver's memory doesn't help. > > */Babak Alipour ,/* > */University of Florida/* > > On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov > <vadim.seme...@datadoghq.com > <mailto:vadim.seme...@datadoghq.com>> wrote: > > Can you post the whole exception stack trace? > What are your executor memory settings? > > Right now I assume that it happens in > UnsafeExternalRowSorter -> > UnsafeExternalSorter:insertRecord > Running more executors with lower > `spark.executor.memory` should help. > On Fri, Sep 30, 2016 at 12:57 PM, Babak > Alipour <babak.alip...@gmail.com > <mailto:babak.alip...@gmail.com>> wrote: > > Greetings everyone, > I'm trying to read a single field of a > Hive table stored as Parquet in Spark > (~140GB for the entire table, this > single field should be just a few GB) > and look at the sorted output using > the following: > > sql("SELECT " + field + " FROM MY_TABLEORDER > BY " + field + " DESC") > > But this simple line of code gives: > *//* > Caused by: > java.lang.IllegalArgumentException: > Cannot allocate a page with more than > 17179869176 <tel:17179869176> bytes > Same error for: > > sql("SELECT " + field + " FROM > MY_TABLE).sort(field) > > and: > > sql("SELECT " + field + " FROM > MY_TABLE).orderBy(field) > > I'm running this on a machine with > more than 200GB of RAM, running in > local mode with spark.driver.memory > set to 64g. > I do not know why it cannot allocate a > big enough page, and why is it trying > to allocate such a big page in the > first place? > I hope someone with more knowledge of > Spark can shed some light on this. > Thank you! > */ > Best regards, > /* > */Babak Alipour ,/* > */University of Florida/* >