balaji krishnan created SPARK-20309: ---------------------------------------
Summary: Repartitioning - more than the default number of partitions Key: SPARK-20309 URL: https://issues.apache.org/jira/browse/SPARK-20309 Project: Spark Issue Type: Bug Components: Java API, SQL Affects Versions: 1.6.1 Environment: Spark 1.6.1, Hadoop 2.6.2, Redhat Linux Reporter: balaji krishnan We have a requirement to process roughly 45MM rows. Each row has a KEY column. The unique number of KEYS are roughly 25000. I do the following Spark SQL to get all 25000 partitions hiveContext.sql("select * from BigTbl distribute by KEY") This nicely creates all required partitions. Using MapPartitions we process all of these partitions in parallel. I have a variable which should be initiated once per partition. Lets call that variable name as designName. This variable is initialized with the KEY value (partition value) for future identification purposes. FlatMapFunction<java.util.Iterator<Row>, DSGType> flatMapSetup = new FlatMapFunction<java.util.Iterator<Row>, DSGType>() { String designName= null; @Override public java.lang.Iterable<DSGType> call(java.util.Iterator<Row> it) throws Exception { while (it.hasNext()) { Row row = it.next(); //row is of type org.apache.spark.sql.Row if (designName == null) { designName = row.getString(1); // row is of type org.apache.spark.sql.Row } ..... } List<DSGType> Dsg = partitionedRecs.mapPartitions(flatMapSetup).collect(); The problem starts here for me. Since i have more than the default number of partitions (200) i do a re-partition to the unique number of partitions in my BigTbl. However if i re-partition the designName gets completely confused. Instead of processing a total of 25000 unique values, i get only 15000 values processed. For some reason the repartition completely messes up the uniqueness of the previous step. The statement that i use to 'distribute by' and subsequently repartition long distinctValues = hiveContext.sql("select KEY from BigTbl").distinct().count(); JavaRDD<Row> partitionedRecs = hiveContext.sql("select * from BigTbl DISTRIBUTE by SKU ").repartition((int)distinctValues).toJavaRDD().cache(); I tried another solution, that is by changing the spark.sql.shuffle.partitions during SparkContext creation. Even that did not help. I get the same issue. new JavaSparkContext(new SparkConf().set("spark.driver.maxResultSize","0").set("spark.sql.shuffle.partitions","26000")) Is there a way to solve this issue please or is this a DISTRIBUTE BY bug ? We are using Spark 1.6.1 Regards Bala -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org