[Spark 2.1.0] Spark SQL return correct count, but NULL on all fields

2017-02-08 Thread Babak Alipour
Hi everyone,

I'm using Spark with HiveSupport enabled, the data is stored in parquet
format in a fixed location.
I just downloaded Spark 2.1.0 and it broke Spark-SQL queries. I can do
count(*) and it returns the correct count, but all columns show as "NULL".
It worked fine on 1.6 & 2.0.x.

I'm guessing it has to with "SPARK-18360 The default table path of tables
in the default database will be under the location of the default database
instead of always depending on the warehouse location setting."
I *want *the table paths to depend on the warehouse location setting but I
couldn't find the configurations to change the behavior back to what it was
before.


Best regards,
*Babak Alipour ,*
*University of Florida*


Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-02 Thread Babak Alipour
Thanks Vadim for sharing your experience, but I have tried multi-JVM setup
(2 workers), various sizes for spark.executor.memory (8g, 16g, 20g, 32g,
64g) and spark.executor.core (2-4), same error all along.

As for the files, these are all .snappy.parquet files, resulting from
inserting some data from other tables. None of them actually exceeds 25MiB
(I don't know why this number) Setting the DataFrame to persist using
StorageLevel.MEMORY_ONLY shows size in memory at ~10g.  I still cannot
understand why it is trying to create such a big page when sorting. The
entire column (this df has only 1 column) is not that big, neither are the
original files. Any ideas?


>Babak



*Babak Alipour ,*
*University of Florida*

On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> oh, and try to run even smaller executors, i.e. with
> `spark.executor.memory` <= 16GiB. I wonder what result you're going to get.
>
> On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <vadim.seme...@datadoghq.com
> > wrote:
>
>> > Do you mean running a multi-JVM 'cluster' on the single machine?
>> Yes, that's what I suggested.
>>
>> You can get some information here:
>> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apach
>> e-spark-jobs-part-2/
>>
>> > How would that affect performance/memory-consumption? If a multi-JVM
>> setup can handle such a large input, then why can't a single-JVM break down
>> the job into smaller tasks?
>> I don't have an answer to these questions, it requires understanding of
>> Spark, JVM, and your setup internal.
>>
>> I ran into the same issue only once when I tried to read a gzipped file
>> which size was >16GiB. That's the only time I had to meet this
>> https://github.com/apache/spark/blob/5d84c7fd83502aeb55
>> 1d46a740502db4862508fe/core/src/main/java/org/apache/
>> spark/memory/TaskMemoryManager.java#L238-L243
>> In the end I had to recompress my file into bzip2 that is splittable to
>> be able to read it with spark.
>>
>>
>> I'd look into size of your files and if they're huge I'd try to connect
>> the error you got to the size of the files (but it's strange to me as a
>> block size of a Parquet file is 128MiB). I don't have any other
>> suggestions, I'm sorry.
>>
>>
>> On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <babak.alip...@gmail.com>
>> wrote:
>>
>>> Do you mean running a multi-JVM 'cluster' on the single machine? How
>>> would that affect performance/memory-consumption? If a multi-JVM setup
>>> can handle such a large input, then why can't a single-JVM break down the
>>> job into smaller tasks?
>>>
>>> I also found that SPARK-9411 mentions making the page_size configurable
>>> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>>>
>>> [1] https://github.com/apache/spark/blob/master/core/src/main/ja
>>> va/org/apache/spark/memory/TaskMemoryManager.java
>>>
>>> ​Spark-9452 also talks about larger page sizes but I don't know how that
>>> affects my use case.​ [2]
>>>
>>> [2] https://github.com/apache/spark/pull/7891
>>>
>>>
>>> ​The reason provided here is that the on-heap allocator's maximum page
>>> size is limited by the maximum amount of data that can be stored in a
>>> long[]​.
>>> Is it possible to force this specific operation to go off-heap so that
>>> it can possibly use a bigger page size?
>>>
>>>
>>>
>>> ​>Babak​
>>>
>>>
>>> *Babak Alipour ,*
>>> *University of Florida*
>>>
>>> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
>>> vadim.seme...@datadoghq.com> wrote:
>>>
>>>> Run more smaller executors: change `spark.executor.memory` to 32g and
>>>> `spark.executor.cores` to 2-4, for example.
>>>>
>>>> Changing driver's memory won't help because it doesn't participate in
>>>> execution.
>>>>
>>>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alip...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thank you for your replies.
>>>>>
>>>>> @Mich, using LIMIT 100 in the query prevents the exception but given
>>>>> the fact that there's enough memory, I don't think this should happen even
>>>>> without LIMIT.
>>>>>
>>>>> @Vadim, here's the full stack trace:
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-01 Thread Babak Alipour
To add one more note, I tried running more smaller executors each with
32-64g memory and executor.cores 2-4 (with 2 workers as well) and I'm still
getting the same exception:

java.lang.IllegalArgumentException: Cannot allocate a page with more than
17179869176 bytes
at
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
at
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:343)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:393)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

>Babak

*Babak Alipour ,*
*University of Florida*

On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <babak.alip...@gmail.com>
wrote:

> Do you mean running a multi-JVM 'cluster' on the single machine? How would
> that affect performance/memory-consumption? If a multi-JVM setup can
> handle such a large input, then why can't a single-JVM break down the job
> into smaller tasks?
>
> I also found that SPARK-9411 mentions making the page_size configurable
> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>
> [1] https://github.com/apache/spark/blob/master/core/src/
> main/java/org/apache/spark/memory/TaskMemoryManager.java
>
> ​Spark-9452 also talks about larger page sizes but I don't know how that
> affects my use case.​ [2]
>
> [2] https://github.com/apache/spark/pull/7891
>
>
> ​The reason provided here is that the on-heap allocator's maximum page
> size is limited by the maximum amount of data that can be stored in a
> long[]​.
> Is it possible to force this specific operation to go off-heap so that it
> can possibly use a bigger page size?
>
>
>
> ​>Babak​
>
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> Run more smaller executors: change `spark.executor.memory` to 32g and
>> `spark.executor.cores` to 2-4, for example.
>>
>> Changing driver's memory won't help because it doesn't participate in
>> execution.
>>
>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alip...@gmail.com>
>> wrote:
>>
>>> Thank you for your replies.
>>>
>>> @Mich, using LIMIT 100 in the query prevents the exception but given the
>>> fact that there's enough memory, I don't think this should happen even
>>> without LIMIT.
>>>
>>> @Vadim, here's the full stack trace:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>> with more than 17179869176 bytes
>>> at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>>> emoryManager.java:241)
>>> at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>>> nsumer.java:121)
>>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>>> tRow(UnsafeExternalRowSorter.java:94)
>>>   

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-01 Thread Babak Alipour
Do you mean running a multi-JVM 'cluster' on the single machine? How would
that affect performance/memory-consumption? If a multi-JVM setup can handle
such a large input, then why can't a single-JVM break down the job into
smaller tasks?

I also found that SPARK-9411 mentions making the page_size configurable but
it's hard-limited to ((1L << 31) - 1) * 8L [1]

[1]
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

​Spark-9452 also talks about larger page sizes but I don't know how that
affects my use case.​ [2]

[2] https://github.com/apache/spark/pull/7891


​The reason provided here is that the on-heap allocator's maximum page size
is limited by the maximum amount of data that can be stored in a long[]​.
Is it possible to force this specific operation to go off-heap so that it
can possibly use a bigger page size?



​>Babak​


*Babak Alipour ,*
*University of Florida*

On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> Run more smaller executors: change `spark.executor.memory` to 32g and
> `spark.executor.cores` to 2-4, for example.
>
> Changing driver's memory won't help because it doesn't participate in
> execution.
>
> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alip...@gmail.com>
> wrote:
>
>> Thank you for your replies.
>>
>> @Mich, using LIMIT 100 in the query prevents the exception but given the
>> fact that there's enough memory, I don't think this should happen even
>> without LIMIT.
>>
>> @Vadim, here's the full stack trace:
>>
>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>> with more than 17179869176 bytes
>> at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>> emoryManager.java:241)
>> at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>> nsumer.java:121)
>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>> orter.insertRecord(UnsafeExternalSorter.java:396)
>> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>> tRow(UnsafeExternalRowSorter.java:94)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.sort_addToSorter$(Unknown Source)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>> eratedIterator.processNext(Unknown Source)
>> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(B
>> ufferedRowIterator.java:43)
>> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>> rite(BypassMergeSortShuffleWriter.java:125)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:79)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.s
>> cala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I'm running spark in local mode so there is only one executor, the driver
>> and spark.driver.memory is set to 64g. Changing the driver's memory doesn't
>> help.
>>
>> *Babak Alipour ,*
>> *University of Florida*
>>
>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> Can you post the whole exception stack trace?
>>> What are your executor memory settings?
>>>
>>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>>> UnsafeExternalSorter:insertRecord
>>>
>>> Running more executors with lower `spark.executor.memory` should help.
>>>
>>>
>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <babak.alip...@gmail.com
>>> > wrote:
>>>
>>>> Greetings everyone,
>>>>
>>>> I'm trying to read a single field of a Hive tabl

Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-09-30 Thread Babak Alipour
Thank you for your replies.

@Mich, using LIMIT 100 in the query prevents the exception but given the
fact that there's enough memory, I don't think this should happen even
without LIMIT.

@Vadim, here's the full stack trace:

Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with
more than 17179869176 bytes
at org.apache.spark.memory.TaskMemoryManager.allocatePage(
TaskMemoryManager.java:241)
at org.apache.spark.memory.MemoryConsumer.allocatePage(
MemoryConsumer.java:121)
at org.apache.spark.util.collection.unsafe.sort.
UnsafeExternalSorter.acquireNewPageIfNecessary(
UnsafeExternalSorter.java:374)
at org.apache.spark.util.collection.unsafe.sort.
UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(
UnsafeExternalRowSorter.java:94)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
GeneratedIterator.sort_addToSorter$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.
hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$
anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(
BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(
ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I'm running spark in local mode so there is only one executor, the driver
and spark.driver.memory is set to 64g. Changing the driver's memory doesn't
help.

*Babak Alipour ,*
*University of Florida*

On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> Can you post the whole exception stack trace?
> What are your executor memory settings?
>
> Right now I assume that it happens in UnsafeExternalRowSorter ->
> UnsafeExternalSorter:insertRecord
>
> Running more executors with lower `spark.executor.memory` should help.
>
>
> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <babak.alip...@gmail.com>
> wrote:
>
>> Greetings everyone,
>>
>> I'm trying to read a single field of a Hive table stored as Parquet in
>> Spark (~140GB for the entire table, this single field should be just a few
>> GB) and look at the sorted output using the following:
>>
>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>
>> ​But this simple line of code gives:
>>
>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>> with more than 17179869176 bytes
>>
>> Same error for:
>>
>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>
>> and:
>>
>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>
>>
>> I'm running this on a machine with more than 200GB of RAM, running in
>> local mode with spark.driver.memory set to 64g.
>>
>> I do not know why it cannot allocate a big enough page, and why is it
>> trying to allocate such a big page in the first place?
>>
>> I hope someone with more knowledge of Spark can shed some light on this.
>> Thank you!
>>
>>
>> *​Best regards,​*
>> *Babak Alipour ,*
>> *University of Florida*
>>
>
>


DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-09-30 Thread Babak Alipour
Greetings everyone,

I'm trying to read a single field of a Hive table stored as Parquet in
Spark (~140GB for the entire table, this single field should be just a few
GB) and look at the sorted output using the following:

sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")

​But this simple line of code gives:

Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with
more than 17179869176 bytes

Same error for:

sql("SELECT " + field + " FROM MY_TABLE).sort(field)

and:

sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)


I'm running this on a machine with more than 200GB of RAM, running in local
mode with spark.driver.memory set to 64g.

I do not know why it cannot allocate a big enough page, and why is it
trying to allocate such a big page in the first place?

I hope someone with more knowledge of Spark can shed some light on this.
Thank you!


*​Best regards,​*
*Babak Alipour ,*
*University of Florida*