Spark repartition question...

2017-04-30 Thread Muthu Jayakumar
Hello there,

I am trying to understand the difference between the following
reparition()...
a. def repartition(partitionExprs: Column*): Dataset[T]
b. def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
c. def repartition(numPartitions: Int): Dataset[T]

My understanding is that (c) is a simpler hash based partitioner where the
number of records are equally partitioned into numPartitions.
(a) is more like (c) except that the nuumPartitions depends on distinct
column values from the expression. right?
(b) Similar to (a) but what does numPartitions mean here?

On a side note, from the source code, it seems like (a) & (b) uses
RepartitionByExpression  . And my guess is that (a) would default the
numPartitions to 200 (which is the default shuffle partition size)

Reason for my question...
say df.reparition(50, col("cat_col"))
and the distinct `cat_col` for the df is about 20 values. The effective
partitions would still be 50? And if it's 50 would the 20 distinct values
would most likely get their own bucket of partition, but some of the values
can repeat into the remainder of the 30 bucket... Is this loosely correct?

The reason for my question is to attempt to fit a large amount of data in
memory that would not fit thru all the workers in the cluster. But if I
repartition the data in some logical manner, then I would be able to fit
the data in the heap to perform some useful joins and write the result back
into parquet (or other useful) datastore

Please advice,
Muthu


Re: Repartition question

2015-08-04 Thread Richard Marscher
Hi,

it is possible to control the number of partitions for the RDD without
calling repartition by setting the max split size for the hadoop input
format used. Tracing through the code, XmlInputFormat extends
FileInputFormat which determines the number of splits (which NewHadoopRdd
uses to determine number of partitions:
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L95)
with a few configs:
https://github.com/apache/hadoop/blob/branch-2.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L200
.

public static final String SPLIT_MAXSIZE =


 mapreduce.input.fileinputformat.split.maxsize;


 public static final String SPLIT_MINSIZE =
mapreduce.input.fileinputformat.split.minsize;
If you are setting SparkConf fields, prefix the keys with spark.hadoop and
they will end up on the Hadoop conf used for the above values.

On Tue, Aug 4, 2015 at 12:31 AM, Naveen Madhire vmadh...@umail.iu.edu
wrote:

 Hi All,

 I am running the WikiPedia parsing example present in the Advance
 Analytics with Spark book.


 https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112


 The partitions of the RDD returned by the readFile function (mentioned
 above) is of 32MB size. So if my file size is 100 MB, RDD is getting
 created with 4 partitions with approx 32MB  size.


 I am running this in a standalone spark cluster mode, every thing is
 working fine only little confused about the nbr of partitions and the size.

 I want to increase the nbr of partitions for the RDD to make use of the
 cluster. Is calling repartition() after this the only option or can I pass
 something in the above method to have more partitions of the RDD.

 Please let me know.

 Thanks.




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Repartition question

2015-08-03 Thread Naveen Madhire
Hi All,

I am running the WikiPedia parsing example present in the Advance
Analytics with Spark book.

https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112


The partitions of the RDD returned by the readFile function (mentioned
above) is of 32MB size. So if my file size is 100 MB, RDD is getting
created with 4 partitions with approx 32MB  size.


I am running this in a standalone spark cluster mode, every thing is
working fine only little confused about the nbr of partitions and the size.

I want to increase the nbr of partitions for the RDD to make use of the
cluster. Is calling repartition() after this the only option or can I pass
something in the above method to have more partitions of the RDD.

Please let me know.

Thanks.