Spark repartition question...
Hello there, I am trying to understand the difference between the following reparition()... a. def repartition(partitionExprs: Column*): Dataset[T] b. def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] c. def repartition(numPartitions: Int): Dataset[T] My understanding is that (c) is a simpler hash based partitioner where the number of records are equally partitioned into numPartitions. (a) is more like (c) except that the nuumPartitions depends on distinct column values from the expression. right? (b) Similar to (a) but what does numPartitions mean here? On a side note, from the source code, it seems like (a) & (b) uses RepartitionByExpression . And my guess is that (a) would default the numPartitions to 200 (which is the default shuffle partition size) Reason for my question... say df.reparition(50, col("cat_col")) and the distinct `cat_col` for the df is about 20 values. The effective partitions would still be 50? And if it's 50 would the 20 distinct values would most likely get their own bucket of partition, but some of the values can repeat into the remainder of the 30 bucket... Is this loosely correct? The reason for my question is to attempt to fit a large amount of data in memory that would not fit thru all the workers in the cluster. But if I repartition the data in some logical manner, then I would be able to fit the data in the heap to perform some useful joins and write the result back into parquet (or other useful) datastore Please advice, Muthu
Re: Repartition question
Hi, it is possible to control the number of partitions for the RDD without calling repartition by setting the max split size for the hadoop input format used. Tracing through the code, XmlInputFormat extends FileInputFormat which determines the number of splits (which NewHadoopRdd uses to determine number of partitions: https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L95) with a few configs: https://github.com/apache/hadoop/blob/branch-2.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L200 . public static final String SPLIT_MAXSIZE = mapreduce.input.fileinputformat.split.maxsize; public static final String SPLIT_MINSIZE = mapreduce.input.fileinputformat.split.minsize; If you are setting SparkConf fields, prefix the keys with spark.hadoop and they will end up on the Hadoop conf used for the above values. On Tue, Aug 4, 2015 at 12:31 AM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi All, I am running the WikiPedia parsing example present in the Advance Analytics with Spark book. https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112 The partitions of the RDD returned by the readFile function (mentioned above) is of 32MB size. So if my file size is 100 MB, RDD is getting created with 4 partitions with approx 32MB size. I am running this in a standalone spark cluster mode, every thing is working fine only little confused about the nbr of partitions and the size. I want to increase the nbr of partitions for the RDD to make use of the cluster. Is calling repartition() after this the only option or can I pass something in the above method to have more partitions of the RDD. Please let me know. Thanks. -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Repartition question
Hi All, I am running the WikiPedia parsing example present in the Advance Analytics with Spark book. https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112 The partitions of the RDD returned by the readFile function (mentioned above) is of 32MB size. So if my file size is 100 MB, RDD is getting created with 4 partitions with approx 32MB size. I am running this in a standalone spark cluster mode, every thing is working fine only little confused about the nbr of partitions and the size. I want to increase the nbr of partitions for the RDD to make use of the cluster. Is calling repartition() after this the only option or can I pass something in the above method to have more partitions of the RDD. Please let me know. Thanks.