[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-04-26 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-7032:
--
Description: 
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(secondList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 
firstMinusSecond.size() + ]);

sc.close();
System.exit(0);
}

public static class MyObject implements Serializable {

public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}

private Integer key;
private Integer counter;

public Integer getKey() {
return key;
}

public void setKey(Integer key) {
this.key = key;
}

public Integer getCounter() {
return counter;
}

public void setCounter(Integer counter) {
this.counter = counter;
}
}

}
{code}

  was:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(firstList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 

[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-04-26 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-7032:
--
  Description: 
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(secondList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 
firstMinusSecond.size() + ]);

sc.close();
System.exit(0);
}

public static class MyObject implements Serializable {

public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}

private Integer key;
private Integer counter;

public Integer getKey() {
return key;
}

public void setKey(Integer key) {
this.key = key;
}

public Integer getCounter() {
return counter;
}

public void setCounter(Integer counter) {
this.counter = counter;
}
}

}
{code}


Same example, give or take, with DataFrames - when not using groupBy works 
good, with groupBy I get 2 rows instead of 1:

{code}
SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 10));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(secondList);

SQLContext sqlc = new SQLContext(sc);

DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);

Row[] collect = firstDataFrame.except(secondDataFrame).collect();
System.out.println(num of rows in first but not in second = [ + 
collect.length + ]);

DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
Row[] collectAgg = firstDataFrame.except(secondAggregated).collect();
System.out.println(num of rows in first but not in second = [ + 
collectAgg.length + ]); // should be 1 row, but there are 2
{code}

  was:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static