balaji krishnan created SPARK-20309:
---------------------------------------

             Summary: Repartitioning - more than the default number of 
partitions
                 Key: SPARK-20309
                 URL: https://issues.apache.org/jira/browse/SPARK-20309
             Project: Spark
          Issue Type: Bug
          Components: Java API, SQL
    Affects Versions: 1.6.1
         Environment: Spark 1.6.1, Hadoop 2.6.2, Redhat Linux
            Reporter: balaji krishnan


We have a requirement to process roughly 45MM rows. Each row has a KEY column. 
The unique number of KEYS are roughly 25000. I do the following Spark SQL to 
get all 25000 partitions

hiveContext.sql("select * from BigTbl distribute by KEY")
This nicely creates all required partitions. Using MapPartitions we process all 
of these partitions in parallel. I have a variable which should be initiated 
once per partition. Lets call that variable name as designName. This variable 
is initialized with the KEY value (partition value) for future identification 
purposes.

 FlatMapFunction<java.util.Iterator<Row>, DSGType> flatMapSetup = new 
FlatMapFunction<java.util.Iterator<Row>, DSGType>() {
        String designName= null; 
        @Override
        public java.lang.Iterable<DSGType> call(java.util.Iterator<Row> it) 
throws Exception {
            while (it.hasNext()) {
                Row row = it.next(); //row is of type org.apache.spark.sql.Row
                if (designName == null) { 
    designName = row.getString(1); // row is of type org.apache.spark.sql.Row
}
.....
}

List<DSGType> Dsg =  partitionedRecs.mapPartitions(flatMapSetup).collect();
The problem starts here for me.

Since i have more than the default number of partitions (200) i do a 
re-partition to the unique number of partitions in my BigTbl. However if i 
re-partition the designName gets completely confused. Instead of processing a 
total of 25000 unique values, i get only 15000 values processed. For some 
reason the repartition completely messes up the uniqueness of the previous 
step. The statement that i use to 'distribute by' and subsequently repartition

long distinctValues = hiveContext.sql("select KEY from 
BigTbl").distinct().count();
JavaRDD<Row> partitionedRecs  = hiveContext.sql("select * from BigTbl 
DISTRIBUTE by SKU ").repartition((int)distinctValues).toJavaRDD().cache();
I tried another solution, that is by changing the spark.sql.shuffle.partitions 
during SparkContext creation. Even that did not help. I get the same issue.

new JavaSparkContext(new 
SparkConf().set("spark.driver.maxResultSize","0").set("spark.sql.shuffle.partitions","26000"))
Is there a way to solve this issue please or is this a DISTRIBUTE BY bug ? We 
are using Spark 1.6.1

Regards

Bala



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to