[ https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lior Chaga closed SPARK-7032. ----------------------------- Resolution: Not A Problem > SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause > ----------------------------------------------------------------------- > > Key: SPARK-7032 > URL: https://issues.apache.org/jira/browse/SPARK-7032 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.2.2, 1.3.1 > Reporter: Lior Chaga > > When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do > not match expected. > In the following example, only 1 record should be in first table and not in > second (as when grouping by key field, the counter for key=1 is 10 in both > tables). > Each of the clauses by itself is working properly when running exclusively. > {code} > //import com.addthis.metrics.reporter.config.ReporterConfig; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.api.java.JavaSQLContext; > import org.apache.spark.sql.api.java.Row; > import java.io.IOException; > import java.io.Serializable; > import java.util.ArrayList; > import java.util.List; > public class SimpleApp { > public static void main(String[] args) throws IOException { > SparkConf conf = new SparkConf().setAppName("Simple Application") > .setMaster("local[1]"); > JavaSparkContext sc = new JavaSparkContext(conf); > List<MyObject> firstList = new ArrayList<MyObject>(2); > firstList.add(new MyObject(1, 10)); > firstList.add(new MyObject(2, 10)); > List<MyObject> secondList = new ArrayList<MyObject>(3); > secondList.add(new MyObject(1, 4)); > secondList.add(new MyObject(1, 6)); > secondList.add(new MyObject(2, 8)); > JavaRDD<MyObject> firstRdd = sc.parallelize(firstList); > JavaRDD<MyObject> secondRdd = sc.parallelize(secondList); > JavaSQLContext sqlc = new JavaSQLContext(sc); > sqlc.applySchema(firstRdd, > MyObject.class).registerTempTable("table1"); > sqlc.sqlContext().cacheTable("table1"); > sqlc.applySchema(secondRdd, > MyObject.class).registerTempTable("table2"); > sqlc.sqlContext().cacheTable("table2"); > List<Row> firstMinusSecond = sqlc.sql( > "SELECT key, counter FROM table1 " + > "EXCEPT " + > "SELECT key, SUM(counter) FROM table2 " + > "GROUP BY key ").collect(); > System.out.println("num of rows in first but not in second = [" + > firstMinusSecond.size() + "]"); > sc.close(); > System.exit(0); > } > public static class MyObject implements Serializable { > public MyObject(Integer key, Integer counter) { > this.key = key; > this.counter = counter; > } > private Integer key; > private Integer counter; > public Integer getKey() { > return key; > } > public void setKey(Integer key) { > this.key = key; > } > public Integer getCounter() { > return counter; > } > public void setCounter(Integer counter) { > this.counter = counter; > } > } > } > {code} > Same example, give or take, with DataFrames - when not using groupBy works > good, with groupBy I get 2 rows instead of 1: > {code} > SparkConf conf = new SparkConf().setAppName("Simple Application") > .setMaster("local[1]"); > JavaSparkContext sc = new JavaSparkContext(conf); > List<MyObject> firstList = new ArrayList<MyObject>(2); > firstList.add(new MyObject(1, 10)); > firstList.add(new MyObject(2, 10)); > List<MyObject> secondList = new ArrayList<MyObject>(3); > secondList.add(new MyObject(1, 10)); > secondList.add(new MyObject(2, 8)); > JavaRDD<MyObject> firstRdd = sc.parallelize(firstList); > JavaRDD<MyObject> secondRdd = sc.parallelize(secondList); > SQLContext sqlc = new SQLContext(sc); > DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class); > DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class); > Row[] collect = firstDataFrame.except(secondDataFrame).collect(); > System.out.println("num of rows in first but not in second = [" + > collect.length + "]"); > DataFrame secondAggregated = secondDataFrame.groupBy("key").sum("counter"); > Row[] collectAgg = firstDataFrame.except(secondAggregated).collect(); > System.out.println("num of rows in first but not in second = [" + > collectAgg.length + "]"); // should be 1 row, but there are 2 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org