Re: Parquet number of partitions

2015-05-07 Thread Eric Eijkelenboom
Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR 
installed with 
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark 
). Both 
with Spark 1.3.1/Hadoop 2.

Reading a folder with 12 Parquet gives me the following:

On EC2: 
scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala> logs.rdd.partitions.length
15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with 
curMem=0, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 122.8 KB, free 265.3 MB)
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with 
curMem=125716, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 18.7 KB, free 265.3 MB)
15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB)
15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block 
broadcast_0_piece0
15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at 
newParquet.scala:478
15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side 
Metadata Split Strategy
res0: Int = 12

On EMR:
scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala> logs.rdd.partitions.length
15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with 
curMem=287247, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values 
in memory (estimated size 259.8 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with 
curMem=553306, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as 
bytes in memory (estimated size 20.7 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block 
broadcast_1_piece0
15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from 
NewHadoopRDD at newParquet.scala:478
15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task 
Side Metadata Split Strategy
res3: Int = 138

138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files). 
I’m reading from the exact same folder on S3.

This leads me to believe that there might be some configuration settings which 
control how partitioning happens. Could that be the case?

Insights would be greatly appreciated. 

Best, Eric



> On 07 May 2015, at 09:31, Archit Thakur  wrote:
> 
> Hi.
> No. of partitions are determined by the RDD it uses in the plan it creates. 
> It uses NewHadoopRDD which gives partitions by getSplits of input format it 
> is using. It uses FilteringParquetRowInputFormat subclass of 
> ParquetInputFormat. To change the no of partitions write a new input format 
> and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use 
> repartition api without change of code.
> 
> Thanks & Regards.
> 
> On Tue, May 5, 2015 at 7:56 PM, Masf  > wrote:
> Hi Eric.
> 
> Q1:
> When I read parquet files, I've tested that Spark generates so many 
> partitions as parquet files exist in the path.
> 
> Q2:
> To reduce the number of partitions you can use rdd.repartition(x), x=> number 
> of partitions. Depend on your case, repartition could be a heavy task
> 
> 
> Regards.
> Miguel.
> 
> On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom 
> mailto:eric.eijkelenb...@gmail.com>> wrote:
> Hello guys
> 
> Q1: How does Spark determine the number of partitions when reading a Parquet 
> file?
> 
> val df = sqlContext.parquetFile(path)
> 
> Is it some way related to the number of Parquet row groups in my input?
> 
> Q2: How can I reduce this number of partitions? Doing this:
> 
> df.rdd.coalesce(200).count
> 
> from the spark-shell causes job execution to hang…
> 
> Any ideas? Thank you in advance.
> 
> Eric
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> 
> 
> Saludos.
> Miguel Ángel
> 



Re: Parquet number of partitions

2015-05-07 Thread Archit Thakur
Hi.
No. of partitions are determined by the RDD it uses in the plan it creates.
It uses NewHadoopRDD which gives partitions by getSplits of input format it
is using. It uses FilteringParquetRowInputFormat subclass of
ParquetInputFormat. To change the no of partitions write a new input format
and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can
use repartition api without change of code.

Thanks & Regards.

On Tue, May 5, 2015 at 7:56 PM, Masf  wrote:

> Hi Eric.
>
> Q1:
> When I read parquet files, I've tested that Spark generates so many
> partitions as parquet files exist in the path.
>
> Q2:
> To reduce the number of partitions you can use rdd.repartition(x), x=>
> number of partitions. Depend on your case, repartition could be a heavy task
>
>
> Regards.
> Miguel.
>
> On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom <
> eric.eijkelenb...@gmail.com> wrote:
>
>> Hello guys
>>
>> Q1: How does Spark determine the number of partitions when reading a
>> Parquet file?
>>
>> val df = sqlContext.parquetFile(path)
>>
>> Is it some way related to the number of Parquet row groups in my input?
>>
>> Q2: How can I reduce this number of partitions? Doing this:
>>
>> df.rdd.coalesce(200).count
>>
>> from the spark-shell causes job execution to hang…
>>
>> Any ideas? Thank you in advance.
>>
>> Eric
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>


Re: Parquet number of partitions

2015-05-05 Thread Masf
Hi Eric.

Q1:
When I read parquet files, I've tested that Spark generates so many
partitions as parquet files exist in the path.

Q2:
To reduce the number of partitions you can use rdd.repartition(x), x=>
number of partitions. Depend on your case, repartition could be a heavy task


Regards.
Miguel.

On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom <
eric.eijkelenb...@gmail.com> wrote:

> Hello guys
>
> Q1: How does Spark determine the number of partitions when reading a
> Parquet file?
>
> val df = sqlContext.parquetFile(path)
>
> Is it some way related to the number of Parquet row groups in my input?
>
> Q2: How can I reduce this number of partitions? Doing this:
>
> df.rdd.coalesce(200).count
>
> from the spark-shell causes job execution to hang…
>
> Any ideas? Thank you in advance.
>
> Eric
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 


Saludos.
Miguel Ángel


Parquet number of partitions

2015-05-05 Thread Eric Eijkelenboom
Hello guys

Q1: How does Spark determine the number of partitions when reading a Parquet 
file?

val df = sqlContext.parquetFile(path)

Is it some way related to the number of Parquet row groups in my input?

Q2: How can I reduce this number of partitions? Doing this:

df.rdd.coalesce(200).count

from the spark-shell causes job execution to hang… 

Any ideas? Thank you in advance. 

Eric
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org