[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
[ https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lior Chaga updated SPARK-7032: -- Description: When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected. In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables). Each of the clauses by itself is working properly when running exclusively. {code} //import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SimpleApp { public static void main(String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName(Simple Application) .setMaster(local[1]); JavaSparkContext sc = new JavaSparkContext(conf); ListMyObject firstList = new ArrayListMyObject(2); firstList.add(new MyObject(1, 10)); firstList.add(new MyObject(2, 10)); ListMyObject secondList = new ArrayListMyObject(3); secondList.add(new MyObject(1, 4)); secondList.add(new MyObject(1, 6)); secondList.add(new MyObject(2, 8)); JavaRDDMyObject firstRdd = sc.parallelize(firstList); JavaRDDMyObject secondRdd = sc.parallelize(secondList); JavaSQLContext sqlc = new JavaSQLContext(sc); sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1); sqlc.sqlContext().cacheTable(table1); sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2); sqlc.sqlContext().cacheTable(table2); ListRow firstMinusSecond = sqlc.sql( SELECT key, counter FROM table1 + EXCEPT + SELECT key, SUM(counter) FROM table2 + GROUP BY key ).collect(); System.out.println(num of rows in first but not in second = [ + firstMinusSecond.size() + ]); sc.close(); System.exit(0); } public static class MyObject implements Serializable { public MyObject(Integer key, Integer counter) { this.key = key; this.counter = counter; } private Integer key; private Integer counter; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } public Integer getCounter() { return counter; } public void setCounter(Integer counter) { this.counter = counter; } } } {code} was: When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected. In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables). Each of the clauses by itself is working properly when running exclusively. {code} //import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SimpleApp { public static void main(String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName(Simple Application) .setMaster(local[1]); JavaSparkContext sc = new JavaSparkContext(conf); ListMyObject firstList = new ArrayListMyObject(2); firstList.add(new MyObject(1, 10)); firstList.add(new MyObject(2, 10)); ListMyObject secondList = new ArrayListMyObject(3); secondList.add(new MyObject(1, 4)); secondList.add(new MyObject(1, 6)); secondList.add(new MyObject(2, 8)); JavaRDDMyObject firstRdd = sc.parallelize(firstList); JavaRDDMyObject secondRdd = sc.parallelize(firstList); JavaSQLContext sqlc = new JavaSQLContext(sc); sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1); sqlc.sqlContext().cacheTable(table1); sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2); sqlc.sqlContext().cacheTable(table2); ListRow firstMinusSecond = sqlc.sql( SELECT key, counter FROM table1 + EXCEPT + SELECT key, SUM(counter) FROM table2 + GROUP BY key ).collect(); System.out.println(num of rows in first but not in second = [ +
[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
[ https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lior Chaga updated SPARK-7032: -- Description: When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected. In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables). Each of the clauses by itself is working properly when running exclusively. {code} //import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SimpleApp { public static void main(String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName(Simple Application) .setMaster(local[1]); JavaSparkContext sc = new JavaSparkContext(conf); ListMyObject firstList = new ArrayListMyObject(2); firstList.add(new MyObject(1, 10)); firstList.add(new MyObject(2, 10)); ListMyObject secondList = new ArrayListMyObject(3); secondList.add(new MyObject(1, 4)); secondList.add(new MyObject(1, 6)); secondList.add(new MyObject(2, 8)); JavaRDDMyObject firstRdd = sc.parallelize(firstList); JavaRDDMyObject secondRdd = sc.parallelize(secondList); JavaSQLContext sqlc = new JavaSQLContext(sc); sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1); sqlc.sqlContext().cacheTable(table1); sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2); sqlc.sqlContext().cacheTable(table2); ListRow firstMinusSecond = sqlc.sql( SELECT key, counter FROM table1 + EXCEPT + SELECT key, SUM(counter) FROM table2 + GROUP BY key ).collect(); System.out.println(num of rows in first but not in second = [ + firstMinusSecond.size() + ]); sc.close(); System.exit(0); } public static class MyObject implements Serializable { public MyObject(Integer key, Integer counter) { this.key = key; this.counter = counter; } private Integer key; private Integer counter; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } public Integer getCounter() { return counter; } public void setCounter(Integer counter) { this.counter = counter; } } } {code} Same example, give or take, with DataFrames - when not using groupBy works good, with groupBy I get 2 rows instead of 1: {code} SparkConf conf = new SparkConf().setAppName(Simple Application) .setMaster(local[1]); JavaSparkContext sc = new JavaSparkContext(conf); ListMyObject firstList = new ArrayListMyObject(2); firstList.add(new MyObject(1, 10)); firstList.add(new MyObject(2, 10)); ListMyObject secondList = new ArrayListMyObject(3); secondList.add(new MyObject(1, 10)); secondList.add(new MyObject(2, 8)); JavaRDDMyObject firstRdd = sc.parallelize(firstList); JavaRDDMyObject secondRdd = sc.parallelize(secondList); SQLContext sqlc = new SQLContext(sc); DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class); DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class); Row[] collect = firstDataFrame.except(secondDataFrame).collect(); System.out.println(num of rows in first but not in second = [ + collect.length + ]); DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); Row[] collectAgg = firstDataFrame.except(secondAggregated).collect(); System.out.println(num of rows in first but not in second = [ + collectAgg.length + ]); // should be 1 row, but there are 2 {code} was: When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected. In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables). Each of the clauses by itself is working properly when running exclusively. {code} //import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SimpleApp { public static