[ 
https://issues.apache.org/jira/browse/SPARK-20309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-20309.
-------------------------------
    Resolution: Not A Problem

> Repartitioning - more than the default number of partitions
> -----------------------------------------------------------
>
>                 Key: SPARK-20309
>                 URL: https://issues.apache.org/jira/browse/SPARK-20309
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API, SQL
>    Affects Versions: 1.6.1
>         Environment: Spark 1.6.1, Hadoop 2.6.2, Redhat Linux
>            Reporter: balaji krishnan
>
> We have a requirement to process roughly 45MM rows. Each row has a KEY 
> column. The unique number of KEYS are roughly 25000. I do the following Spark 
> SQL to get all 25000 partitions
> hiveContext.sql("select * from BigTbl distribute by KEY")
> This nicely creates all required partitions. Using MapPartitions we process 
> all of these partitions in parallel. I have a variable which should be 
> initiated once per partition. Lets call that variable name as designName. 
> This variable is initialized with the KEY value (partition value) for future 
> identification purposes.
>  FlatMapFunction<java.util.Iterator<Row>, DSGType> flatMapSetup = new 
> FlatMapFunction<java.util.Iterator<Row>, DSGType>() {
>         String designName= null; 
>         @Override
>         public java.lang.Iterable<DSGType> call(java.util.Iterator<Row> it) 
> throws Exception {
>             while (it.hasNext()) {
>                 Row row = it.next(); //row is of type org.apache.spark.sql.Row
>                 if (designName == null) { 
>     designName = row.getString(1); // row is of type org.apache.spark.sql.Row
> }
> .....
> }
> List<DSGType> Dsg =  partitionedRecs.mapPartitions(flatMapSetup).collect();
> The problem starts here for me.
> Since i have more than the default number of partitions (200) i do a 
> re-partition to the unique number of partitions in my BigTbl. However if i 
> re-partition the designName gets completely confused. Instead of processing a 
> total of 25000 unique values, i get only 15000 values processed. For some 
> reason the repartition completely messes up the uniqueness of the previous 
> step. The statement that i use to 'distribute by' and subsequently repartition
> long distinctValues = hiveContext.sql("select KEY from 
> BigTbl").distinct().count();
> JavaRDD<Row> partitionedRecs  = hiveContext.sql("select * from BigTbl 
> DISTRIBUTE by SKU ").repartition((int)distinctValues).toJavaRDD().cache();
> I tried another solution, that is by changing the 
> spark.sql.shuffle.partitions during SparkContext creation. Even that did not 
> help. I get the same issue.
> new JavaSparkContext(new 
> SparkConf().set("spark.driver.maxResultSize","0").set("spark.sql.shuffle.partitions","26000"))
> Is there a way to solve this issue please or is this a DISTRIBUTE BY bug ? We 
> are using Spark 1.6.1
> Regards
> Bala



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to