[ https://issues.apache.org/jira/browse/SPARK-20309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-20309. ------------------------------- Resolution: Not A Problem > Repartitioning - more than the default number of partitions > ----------------------------------------------------------- > > Key: SPARK-20309 > URL: https://issues.apache.org/jira/browse/SPARK-20309 > Project: Spark > Issue Type: Bug > Components: Java API, SQL > Affects Versions: 1.6.1 > Environment: Spark 1.6.1, Hadoop 2.6.2, Redhat Linux > Reporter: balaji krishnan > > We have a requirement to process roughly 45MM rows. Each row has a KEY > column. The unique number of KEYS are roughly 25000. I do the following Spark > SQL to get all 25000 partitions > hiveContext.sql("select * from BigTbl distribute by KEY") > This nicely creates all required partitions. Using MapPartitions we process > all of these partitions in parallel. I have a variable which should be > initiated once per partition. Lets call that variable name as designName. > This variable is initialized with the KEY value (partition value) for future > identification purposes. > FlatMapFunction<java.util.Iterator<Row>, DSGType> flatMapSetup = new > FlatMapFunction<java.util.Iterator<Row>, DSGType>() { > String designName= null; > @Override > public java.lang.Iterable<DSGType> call(java.util.Iterator<Row> it) > throws Exception { > while (it.hasNext()) { > Row row = it.next(); //row is of type org.apache.spark.sql.Row > if (designName == null) { > designName = row.getString(1); // row is of type org.apache.spark.sql.Row > } > ..... > } > List<DSGType> Dsg = partitionedRecs.mapPartitions(flatMapSetup).collect(); > The problem starts here for me. > Since i have more than the default number of partitions (200) i do a > re-partition to the unique number of partitions in my BigTbl. However if i > re-partition the designName gets completely confused. Instead of processing a > total of 25000 unique values, i get only 15000 values processed. For some > reason the repartition completely messes up the uniqueness of the previous > step. The statement that i use to 'distribute by' and subsequently repartition > long distinctValues = hiveContext.sql("select KEY from > BigTbl").distinct().count(); > JavaRDD<Row> partitionedRecs = hiveContext.sql("select * from BigTbl > DISTRIBUTE by SKU ").repartition((int)distinctValues).toJavaRDD().cache(); > I tried another solution, that is by changing the > spark.sql.shuffle.partitions during SparkContext creation. Even that did not > help. I get the same issue. > new JavaSparkContext(new > SparkConf().set("spark.driver.maxResultSize","0").set("spark.sql.shuffle.partitions","26000")) > Is there a way to solve this issue please or is this a DISTRIBUTE BY bug ? We > are using Spark 1.6.1 > Regards > Bala -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org