[jira] [Commented] (SPARK-6305) Add support for log4j 2.x to Spark

2015-03-16 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362996#comment-14362996
 ] 

Lior Chaga commented on SPARK-6305:
---

Works by adding log4j 2.x jars with log4j1.2-api bridge to the classpath with 
SPARK_CLASSPATH. 
No need for changing spark distribution. Closed the pull request.

 Add support for log4j 2.x to Spark
 --

 Key: SPARK-6305
 URL: https://issues.apache.org/jira/browse/SPARK-6305
 Project: Spark
  Issue Type: New Feature
  Components: Build
Reporter: Tal Sliwowicz

 log4j 2 requires replacing the slf4j binding and adding the log4j jars in the 
 classpath. Since there are shaded jars, it must be done during the build.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-04-21 Thread Lior Chaga (JIRA)
Lior Chaga created SPARK-7032:
-

 Summary: SparkSQL incorrect results when using UNION/EXCEPT with 
GROUP BY clause
 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2
Reporter: Lior Chaga


When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(firstList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 
firstMinusSecond.size() + ]);

sc.close();
System.exit(0);
}

public static class MyObject implements Serializable {

public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}

private Integer key;
private Integer counter;

public Integer getKey() {
return key;
}

public void setKey(Integer key) {
this.key = key;
}

public Integer getCounter() {
return counter;
}

public void setCounter(Integer counter) {
this.counter = counter;
}
}

}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-04-26 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14512951#comment-14512951
 ] 

Lior Chaga commented on SPARK-7032:
---

I noticed that if I use the same group by for both data frames, the code works:

{code}
DataFrame firstAggregated = firstDataFrame.groupBy(key).sum(counter);
DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter);
Row[] collectAgg = firstAggregated.except(secondAggregated).collect(); // 
returns one row, which is correct
{code}

Tried it only in 1.3.1, but I assume the same would work with 1.2.x

 SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
 ---

 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2, 1.3.1
Reporter: Lior Chaga

 When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
 not match expected.
 In the following example, only 1 record should be in first table and not in 
 second (as when grouping by key field, the counter for key=1 is 10 in both 
 tables).
 Each of the clauses by itself is working properly when running exclusively. 
 {code}
 //import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.api.java.JavaSQLContext;
 import org.apache.spark.sql.api.java.Row;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 public class SimpleApp {
 public static void main(String[] args) throws IOException {
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 4));
 secondList.add(new MyObject(1, 6));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 JavaSQLContext sqlc = new JavaSQLContext(sc);
 sqlc.applySchema(firstRdd, 
 MyObject.class).registerTempTable(table1);
 sqlc.sqlContext().cacheTable(table1);
 sqlc.applySchema(secondRdd, 
 MyObject.class).registerTempTable(table2);
 sqlc.sqlContext().cacheTable(table2);
 ListRow firstMinusSecond = sqlc.sql(
 SELECT key, counter FROM table1  +
 EXCEPT  +
 SELECT key, SUM(counter) FROM table2  +
 GROUP BY key ).collect();
 System.out.println(num of rows in first but not in second = [ + 
 firstMinusSecond.size() + ]);
 sc.close();
 System.exit(0);
 }
 public static class MyObject implements Serializable {
 public MyObject(Integer key, Integer counter) {
 this.key = key;
 this.counter = counter;
 }
 private Integer key;
 private Integer counter;
 public Integer getKey() {
 return key;
 }
 public void setKey(Integer key) {
 this.key = key;
 }
 public Integer getCounter() {
 return counter;
 }
 public void setCounter(Integer counter) {
 this.counter = counter;
 }
 }
 }
 {code}
 Same example, give or take, with DataFrames - when not using groupBy works 
 good, with groupBy I get 2 rows instead of 1:
 {code}
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 10));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 SQLContext sqlc = new SQLContext(sc);
 DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
 DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);
 Row[] collect = firstDataFrame.except(secondDataFrame).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collect.length + ]);
 DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
 Row[] collectAgg = firstDataFrame.except(secondAggregated).collect();
 

[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-04-26 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-7032:
--
Description: 
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(secondList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 
firstMinusSecond.size() + ]);

sc.close();
System.exit(0);
}

public static class MyObject implements Serializable {

public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}

private Integer key;
private Integer counter;

public Integer getKey() {
return key;
}

public void setKey(Integer key) {
this.key = key;
}

public Integer getCounter() {
return counter;
}

public void setCounter(Integer counter) {
this.counter = counter;
}
}

}
{code}

  was:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(firstList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 

[jira] [Updated] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-04-26 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-7032:
--
  Description: 
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static void main(String[] args) throws IOException {

SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 4));
secondList.add(new MyObject(1, 6));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(secondList);

JavaSQLContext sqlc = new JavaSQLContext(sc);
sqlc.applySchema(firstRdd, MyObject.class).registerTempTable(table1);
sqlc.sqlContext().cacheTable(table1);
sqlc.applySchema(secondRdd, MyObject.class).registerTempTable(table2);
sqlc.sqlContext().cacheTable(table2);

ListRow firstMinusSecond = sqlc.sql(
SELECT key, counter FROM table1  +
EXCEPT  +
SELECT key, SUM(counter) FROM table2  +
GROUP BY key ).collect();

System.out.println(num of rows in first but not in second = [ + 
firstMinusSecond.size() + ]);

sc.close();
System.exit(0);
}

public static class MyObject implements Serializable {

public MyObject(Integer key, Integer counter) {
this.key = key;
this.counter = counter;
}

private Integer key;
private Integer counter;

public Integer getKey() {
return key;
}

public void setKey(Integer key) {
this.key = key;
}

public Integer getCounter() {
return counter;
}

public void setCounter(Integer counter) {
this.counter = counter;
}
}

}
{code}


Same example, give or take, with DataFrames - when not using groupBy works 
good, with groupBy I get 2 rows instead of 1:

{code}
SparkConf conf = new SparkConf().setAppName(Simple Application)
.setMaster(local[1]);
JavaSparkContext sc = new JavaSparkContext(conf);

ListMyObject firstList = new ArrayListMyObject(2);
firstList.add(new MyObject(1, 10));
firstList.add(new MyObject(2, 10));

ListMyObject secondList = new ArrayListMyObject(3);
secondList.add(new MyObject(1, 10));
secondList.add(new MyObject(2, 8));

JavaRDDMyObject firstRdd = sc.parallelize(firstList);
JavaRDDMyObject secondRdd = sc.parallelize(secondList);

SQLContext sqlc = new SQLContext(sc);

DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);

Row[] collect = firstDataFrame.except(secondDataFrame).collect();
System.out.println(num of rows in first but not in second = [ + 
collect.length + ]);

DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
Row[] collectAgg = firstDataFrame.except(secondAggregated).collect();
System.out.println(num of rows in first but not in second = [ + 
collectAgg.length + ]); // should be 1 row, but there are 2
{code}

  was:
When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
not match expected.

In the following example, only 1 record should be in first table and not in 
second (as when grouping by key field, the counter for key=1 is 10 in both 
tables).
Each of the clauses by itself is working properly when running exclusively. 


{code}
//import com.addthis.metrics.reporter.config.ReporterConfig;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.Row;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SimpleApp {

public static 

[jira] [Closed] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-05-28 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga closed SPARK-7032.
-
Resolution: Not A Problem

 SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
 ---

 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2, 1.3.1
Reporter: Lior Chaga

 When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
 not match expected.
 In the following example, only 1 record should be in first table and not in 
 second (as when grouping by key field, the counter for key=1 is 10 in both 
 tables).
 Each of the clauses by itself is working properly when running exclusively. 
 {code}
 //import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.api.java.JavaSQLContext;
 import org.apache.spark.sql.api.java.Row;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 public class SimpleApp {
 public static void main(String[] args) throws IOException {
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 4));
 secondList.add(new MyObject(1, 6));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 JavaSQLContext sqlc = new JavaSQLContext(sc);
 sqlc.applySchema(firstRdd, 
 MyObject.class).registerTempTable(table1);
 sqlc.sqlContext().cacheTable(table1);
 sqlc.applySchema(secondRdd, 
 MyObject.class).registerTempTable(table2);
 sqlc.sqlContext().cacheTable(table2);
 ListRow firstMinusSecond = sqlc.sql(
 SELECT key, counter FROM table1  +
 EXCEPT  +
 SELECT key, SUM(counter) FROM table2  +
 GROUP BY key ).collect();
 System.out.println(num of rows in first but not in second = [ + 
 firstMinusSecond.size() + ]);
 sc.close();
 System.exit(0);
 }
 public static class MyObject implements Serializable {
 public MyObject(Integer key, Integer counter) {
 this.key = key;
 this.counter = counter;
 }
 private Integer key;
 private Integer counter;
 public Integer getKey() {
 return key;
 }
 public void setKey(Integer key) {
 this.key = key;
 }
 public Integer getCounter() {
 return counter;
 }
 public void setCounter(Integer counter) {
 this.counter = counter;
 }
 }
 }
 {code}
 Same example, give or take, with DataFrames - when not using groupBy works 
 good, with groupBy I get 2 rows instead of 1:
 {code}
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 10));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 SQLContext sqlc = new SQLContext(sc);
 DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
 DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);
 Row[] collect = firstDataFrame.except(secondDataFrame).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collect.length + ]);
 DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
 Row[] collectAgg = firstDataFrame.except(secondAggregated).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collectAgg.length + ]); // should be 1 row, but there are 2
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-05-28 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14562641#comment-14562641
 ] 

Lior Chaga commented on SPARK-7032:
---

[~viirya], the following modification to the code solves the problem with 
dataframes:

{code}
DataFrame secondAggregated = 
secondDataFrame.groupBy(key).sum(counter).withColumnRenamed(SUM(counter), 
counter);
Row[] collectAgg = firstDataFrame.select(counter, 
key).except(secondAggregated.select(counter, key)).collect();
{code}

or simply:
{code}
Row[] collectAgg = firstDataFrame.select(counter, 
key).except(secondAggregated.select(SUM(counter), key)).collect();
{code}

Thanks for you help!

 SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
 ---

 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2, 1.3.1
Reporter: Lior Chaga

 When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
 not match expected.
 In the following example, only 1 record should be in first table and not in 
 second (as when grouping by key field, the counter for key=1 is 10 in both 
 tables).
 Each of the clauses by itself is working properly when running exclusively. 
 {code}
 //import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.api.java.JavaSQLContext;
 import org.apache.spark.sql.api.java.Row;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 public class SimpleApp {
 public static void main(String[] args) throws IOException {
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 4));
 secondList.add(new MyObject(1, 6));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 JavaSQLContext sqlc = new JavaSQLContext(sc);
 sqlc.applySchema(firstRdd, 
 MyObject.class).registerTempTable(table1);
 sqlc.sqlContext().cacheTable(table1);
 sqlc.applySchema(secondRdd, 
 MyObject.class).registerTempTable(table2);
 sqlc.sqlContext().cacheTable(table2);
 ListRow firstMinusSecond = sqlc.sql(
 SELECT key, counter FROM table1  +
 EXCEPT  +
 SELECT key, SUM(counter) FROM table2  +
 GROUP BY key ).collect();
 System.out.println(num of rows in first but not in second = [ + 
 firstMinusSecond.size() + ]);
 sc.close();
 System.exit(0);
 }
 public static class MyObject implements Serializable {
 public MyObject(Integer key, Integer counter) {
 this.key = key;
 this.counter = counter;
 }
 private Integer key;
 private Integer counter;
 public Integer getKey() {
 return key;
 }
 public void setKey(Integer key) {
 this.key = key;
 }
 public Integer getCounter() {
 return counter;
 }
 public void setCounter(Integer counter) {
 this.counter = counter;
 }
 }
 }
 {code}
 Same example, give or take, with DataFrames - when not using groupBy works 
 good, with groupBy I get 2 rows instead of 1:
 {code}
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 10));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 SQLContext sqlc = new SQLContext(sc);
 DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
 DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);
 Row[] collect = firstDataFrame.except(secondDataFrame).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collect.length + ]);
 DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
 

[jira] [Commented] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-05-28 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14562634#comment-14562634
 ] 

Lior Chaga commented on SPARK-7032:
---

You are right. The first example with the explicit select clause does work with 
latest code base.
So I guess the dataframes example isn't working because of the Java Beans 
limitation, as you suggested. 


 SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
 ---

 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2, 1.3.1
Reporter: Lior Chaga

 When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
 not match expected.
 In the following example, only 1 record should be in first table and not in 
 second (as when grouping by key field, the counter for key=1 is 10 in both 
 tables).
 Each of the clauses by itself is working properly when running exclusively. 
 {code}
 //import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.api.java.JavaSQLContext;
 import org.apache.spark.sql.api.java.Row;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 public class SimpleApp {
 public static void main(String[] args) throws IOException {
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 4));
 secondList.add(new MyObject(1, 6));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 JavaSQLContext sqlc = new JavaSQLContext(sc);
 sqlc.applySchema(firstRdd, 
 MyObject.class).registerTempTable(table1);
 sqlc.sqlContext().cacheTable(table1);
 sqlc.applySchema(secondRdd, 
 MyObject.class).registerTempTable(table2);
 sqlc.sqlContext().cacheTable(table2);
 ListRow firstMinusSecond = sqlc.sql(
 SELECT key, counter FROM table1  +
 EXCEPT  +
 SELECT key, SUM(counter) FROM table2  +
 GROUP BY key ).collect();
 System.out.println(num of rows in first but not in second = [ + 
 firstMinusSecond.size() + ]);
 sc.close();
 System.exit(0);
 }
 public static class MyObject implements Serializable {
 public MyObject(Integer key, Integer counter) {
 this.key = key;
 this.counter = counter;
 }
 private Integer key;
 private Integer counter;
 public Integer getKey() {
 return key;
 }
 public void setKey(Integer key) {
 this.key = key;
 }
 public Integer getCounter() {
 return counter;
 }
 public void setCounter(Integer counter) {
 this.counter = counter;
 }
 }
 }
 {code}
 Same example, give or take, with DataFrames - when not using groupBy works 
 good, with groupBy I get 2 rows instead of 1:
 {code}
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 10));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 SQLContext sqlc = new SQLContext(sc);
 DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
 DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);
 Row[] collect = firstDataFrame.except(secondDataFrame).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collect.length + ]);
 DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
 Row[] collectAgg = firstDataFrame.except(secondAggregated).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collectAgg.length + ]); // should be 1 row, but there are 2
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-05-26 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14558803#comment-14558803
 ] 

Lior Chaga commented on SPARK-7032:
---

[~viirya], the problem still occurs with latest codebase.

 SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
 ---

 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2, 1.3.1
Reporter: Lior Chaga

 When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
 not match expected.
 In the following example, only 1 record should be in first table and not in 
 second (as when grouping by key field, the counter for key=1 is 10 in both 
 tables).
 Each of the clauses by itself is working properly when running exclusively. 
 {code}
 //import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.api.java.JavaSQLContext;
 import org.apache.spark.sql.api.java.Row;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 public class SimpleApp {
 public static void main(String[] args) throws IOException {
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 4));
 secondList.add(new MyObject(1, 6));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 JavaSQLContext sqlc = new JavaSQLContext(sc);
 sqlc.applySchema(firstRdd, 
 MyObject.class).registerTempTable(table1);
 sqlc.sqlContext().cacheTable(table1);
 sqlc.applySchema(secondRdd, 
 MyObject.class).registerTempTable(table2);
 sqlc.sqlContext().cacheTable(table2);
 ListRow firstMinusSecond = sqlc.sql(
 SELECT key, counter FROM table1  +
 EXCEPT  +
 SELECT key, SUM(counter) FROM table2  +
 GROUP BY key ).collect();
 System.out.println(num of rows in first but not in second = [ + 
 firstMinusSecond.size() + ]);
 sc.close();
 System.exit(0);
 }
 public static class MyObject implements Serializable {
 public MyObject(Integer key, Integer counter) {
 this.key = key;
 this.counter = counter;
 }
 private Integer key;
 private Integer counter;
 public Integer getKey() {
 return key;
 }
 public void setKey(Integer key) {
 this.key = key;
 }
 public Integer getCounter() {
 return counter;
 }
 public void setCounter(Integer counter) {
 this.counter = counter;
 }
 }
 }
 {code}
 Same example, give or take, with DataFrames - when not using groupBy works 
 good, with groupBy I get 2 rows instead of 1:
 {code}
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 10));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 SQLContext sqlc = new SQLContext(sc);
 DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
 DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);
 Row[] collect = firstDataFrame.except(secondDataFrame).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collect.length + ]);
 DataFrame secondAggregated = secondDataFrame.groupBy(key).sum(counter); 
 Row[] collectAgg = firstDataFrame.except(secondAggregated).collect();
 System.out.println(num of rows in first but not in second = [ + 
 collectAgg.length + ]); // should be 1 row, but there are 2
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Commented] (SPARK-7032) SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause

2015-05-26 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14558989#comment-14558989
 ] 

Lior Chaga commented on SPARK-7032:
---

Thanks, [~viirya].
I'm not sure this is the problem as the first code snippet I used (with spark 
1.2.1) is using explicit query fields and not select *. 

Anyway, obviously this is a simplified example. In reality I'm creating 
dataframes from two RDDs I load for different datasources, so using simple 
arrays is not an option.
Also using scala is not an option in the near future, because in a medium size 
company these things are not spontaneous.

Fortunately, I'm working around it by doing the aggregation on the RDDs before 
creating the data frames, so I avoid the group by altogether. 


 SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause
 ---

 Key: SPARK-7032
 URL: https://issues.apache.org/jira/browse/SPARK-7032
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.2, 1.3.1
Reporter: Lior Chaga

 When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do 
 not match expected.
 In the following example, only 1 record should be in first table and not in 
 second (as when grouping by key field, the counter for key=1 is 10 in both 
 tables).
 Each of the clauses by itself is working properly when running exclusively. 
 {code}
 //import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.api.java.JavaSQLContext;
 import org.apache.spark.sql.api.java.Row;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 public class SimpleApp {
 public static void main(String[] args) throws IOException {
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 4));
 secondList.add(new MyObject(1, 6));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 JavaSQLContext sqlc = new JavaSQLContext(sc);
 sqlc.applySchema(firstRdd, 
 MyObject.class).registerTempTable(table1);
 sqlc.sqlContext().cacheTable(table1);
 sqlc.applySchema(secondRdd, 
 MyObject.class).registerTempTable(table2);
 sqlc.sqlContext().cacheTable(table2);
 ListRow firstMinusSecond = sqlc.sql(
 SELECT key, counter FROM table1  +
 EXCEPT  +
 SELECT key, SUM(counter) FROM table2  +
 GROUP BY key ).collect();
 System.out.println(num of rows in first but not in second = [ + 
 firstMinusSecond.size() + ]);
 sc.close();
 System.exit(0);
 }
 public static class MyObject implements Serializable {
 public MyObject(Integer key, Integer counter) {
 this.key = key;
 this.counter = counter;
 }
 private Integer key;
 private Integer counter;
 public Integer getKey() {
 return key;
 }
 public void setKey(Integer key) {
 this.key = key;
 }
 public Integer getCounter() {
 return counter;
 }
 public void setCounter(Integer counter) {
 this.counter = counter;
 }
 }
 }
 {code}
 Same example, give or take, with DataFrames - when not using groupBy works 
 good, with groupBy I get 2 rows instead of 1:
 {code}
 SparkConf conf = new SparkConf().setAppName(Simple Application)
 .setMaster(local[1]);
 JavaSparkContext sc = new JavaSparkContext(conf);
 ListMyObject firstList = new ArrayListMyObject(2);
 firstList.add(new MyObject(1, 10));
 firstList.add(new MyObject(2, 10));
 ListMyObject secondList = new ArrayListMyObject(3);
 secondList.add(new MyObject(1, 10));
 secondList.add(new MyObject(2, 8));
 JavaRDDMyObject firstRdd = sc.parallelize(firstList);
 JavaRDDMyObject secondRdd = sc.parallelize(secondList);
 SQLContext sqlc = new SQLContext(sc);
 DataFrame firstDataFrame = sqlc.createDataFrame(firstRdd, MyObject.class);
 DataFrame secondDataFrame = sqlc.createDataFrame(secondRdd, MyObject.class);
 Row[] collect = firstDataFrame.except(secondDataFrame).collect();
 System.out.println(num of rows in first but not in second = [ + 

[jira] [Commented] (SPARK-4300) Race condition during SparkWorker shutdown

2015-08-10 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14680154#comment-14680154
 ] 

Lior Chaga commented on SPARK-4300:
---

Also exists in spark 1.4:

{panel}
12:31:10.821 [File appending thread for 
/var/lib/spark/data/disk1/app-20150809122638-/13/stdout] ERROR 
org.apache.spark.util.logging.FileAppender - Error writing stream to file 
/var/lib/spark/data/disk1/app-2015080
9122638-/13/stdout
java.io.IOException: Stream closed
at 
java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:145) 
~[?:1.6.0_41]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:255) 
~[?:1.6.0_41]
at java.io.BufferedInputStream.read(BufferedInputStream.java:317) 
~[?:1.6.0_41]
at java.io.FilterInputStream.read(FilterInputStream.java:90) 
~[?:1.6.0_41]
at 
org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
{panel}

And later on I see:
{panel}
12:22:30.861 [sparkWorker-akka.actor.default-dispatcher-2] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[sparkWorker-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem 
[sparkWorker]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.spark_project.protobuf.ByteString.copyFrom(ByteString.java:192) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at org.spark_project.protobuf.ByteString.copyFrom(ByteString.java:204) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
akka.remote.serialization.MessageContainerSerializer.serializeSelection(MessageContainerSerializer.scala:36)
 ~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
akka.remote.serialization.MessageContainerSerializer.toBinary(MessageContainerSerializer.scala:25)
 ~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
 ~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
 ~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:722) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) 
~[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [spark-assembly-1.4.0-hadoop2.2.0.jar:1.4.0]
Exception in thread qtp1853216600-31 

[jira] [Commented] (SPARK-9096) Unevenly distributed task loads after using JavaRDD.subtract()

2015-09-06 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14732230#comment-14732230
 ] 

Lior Chaga commented on SPARK-9096:
---

I would suggest the following alternative to changing partitioner:
1. mapToPair rddVectors, and assign a 0/1 value to each vector according to 
desired sample ration
2. filter the resulting pair rdd by value
3. map each pair rdd back to training and test vector rdds.

I believe that the sample-subtract approach is less performant than what I 
suggested.

> Unevenly distributed task loads after using JavaRDD.subtract()
> --
>
> Key: SPARK-9096
> URL: https://issues.apache.org/jira/browse/SPARK-9096
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.0, 1.4.1
>Reporter: Gisle Ytrestøl
>Priority: Minor
> Attachments: ReproduceBug.java, hanging-one-task.jpg, 
> reproduce.1.3.1.log.gz, reproduce.1.4.1.log.gz
>
>
> When using JavaRDD.subtract(), it seems that the tasks are unevenly 
> distributed in the the following operations on the new JavaRDD which is 
> created by "subtract". The result is that in the following operation on the 
> new JavaRDD, a few tasks process almost all the data, and these tasks will 
> take a long time to finish. 
> I've reproduced this bug in the attached Java file, which I submit with 
> spark-submit. 
> The logs for 1.3.1 and 1.4.1 are attached. In 1.4.1, we see that a few tasks 
> in the count job takes a lot of time:
> 15/07/16 09:13:17 INFO TaskSetManager: Finished task 1459.0 in stage 2.0 (TID 
> 4659) in 708 ms on 148.251.190.217 (1597/1600)
> 15/07/16 09:13:17 INFO TaskSetManager: Finished task 1586.0 in stage 2.0 (TID 
> 4786) in 772 ms on 148.251.190.217 (1598/1600)
> 15/07/16 09:17:51 INFO TaskSetManager: Finished task 1382.0 in stage 2.0 (TID 
> 4582) in 275019 ms on 148.251.190.217 (1599/1600)
> 15/07/16 09:20:02 INFO TaskSetManager: Finished task 1230.0 in stage 2.0 (TID 
> 4430) in 407020 ms on 148.251.190.217 (1600/1600)
> 15/07/16 09:20:02 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks 
> have all completed, from pool 
> 15/07/16 09:20:02 INFO DAGScheduler: ResultStage 2 (count at 
> ReproduceBug.java:56) finished in 420.024 s
> 15/07/16 09:20:02 INFO DAGScheduler: Job 0 finished: count at 
> ReproduceBug.java:56, took 442.941395 s
> In comparison, all tasks are more or less equal in size when running the same 
> application in Spark 1.3.1. In overall, this
> attached application (ReproduceBug.java) takes about 7 minutes on Spark 
> 1.4.1, and completes in roughly 30 seconds in Spark 1.3.1. 
> Spark 1.4.0 behaves similar to Spark 1.4.1 wrt this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14919) Spark Cannot be used with software that requires jackson-databind 2.6+: RDDOperationScope

2016-05-31 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307622#comment-15307622
 ] 

Lior Chaga commented on SPARK-14919:


[~srowen], I totally agree that thing get done by people. But when I see an 
pull request with "Won't fix" and lots of tickets with "not a problem", then 
reading between the lines I get the impression that spark leading contributors 
are against changing it, otherwise I'd expect to have at least an open ticket 
for it (and perhaps I just got the wrong impression). 
I'll try make some available time to handle it soon.

Thanks.

> Spark Cannot be used with software that requires jackson-databind 2.6+: 
> RDDOperationScope
> -
>
> Key: SPARK-14919
> URL: https://issues.apache.org/jira/browse/SPARK-14919
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 1.6.1
> Environment: Linux, OSX
>Reporter: John Ferguson
>
> When using Spark 1.4.x or Spark 1.6.1 in an application that has a front end 
> requiring jackson-databind 2.6+, we see the follow exceptions:
> Subset of stack trace:
> ==
> com.fasterxml.jackson.databind.JsonMappingException: Could not find creator 
> property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
>  at [Source: {"id":"0","name":"textFile"}; line: 1, column: 1]
>   at 
> com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
>   at 
> com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:405)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:354)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:262)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:242)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
>   at 
> com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3664)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3556)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2576)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)
>   at 
> org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1011)
>   at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:832)
>   at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:830)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at org.apache.spark.SparkContext.textFile(SparkContext.scala:830)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14919) Spark Cannot be used with software that requires jackson-databind 2.6+: RDDOperationScope

2016-05-31 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307631#comment-15307631
 ] 

Lior Chaga commented on SPARK-14919:


Well, my experience with user classpath first is not so good. I can't recall 
the exact issue I had.
BTW, is there any point on fixing this in 1.x branch? Are there any planned 
releases?

> Spark Cannot be used with software that requires jackson-databind 2.6+: 
> RDDOperationScope
> -
>
> Key: SPARK-14919
> URL: https://issues.apache.org/jira/browse/SPARK-14919
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 1.6.1
> Environment: Linux, OSX
>Reporter: John Ferguson
>
> When using Spark 1.4.x or Spark 1.6.1 in an application that has a front end 
> requiring jackson-databind 2.6+, we see the follow exceptions:
> Subset of stack trace:
> ==
> com.fasterxml.jackson.databind.JsonMappingException: Could not find creator 
> property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
>  at [Source: {"id":"0","name":"textFile"}; line: 1, column: 1]
>   at 
> com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
>   at 
> com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:405)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:354)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:262)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:242)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
>   at 
> com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3664)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3556)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2576)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)
>   at 
> org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1011)
>   at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:832)
>   at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:830)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at org.apache.spark.SparkContext.textFile(SparkContext.scala:830)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14919) Spark Cannot be used with software that requires jackson-databind 2.6+: RDDOperationScope

2016-05-31 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307606#comment-15307606
 ] 

Lior Chaga commented on SPARK-14919:


I totally agree with [~fergjo00], if shading is so no un-common, why just not 
do it in spark? 
Why does every application that uses spark has to tackle this from it's side. 
Just looking at the amount of reported issues concerning fasterxml and spark, 
it seems to be a real concern. One of them (SPARK-13022) even included a 
pull-request, so I really don't get it.

> Spark Cannot be used with software that requires jackson-databind 2.6+: 
> RDDOperationScope
> -
>
> Key: SPARK-14919
> URL: https://issues.apache.org/jira/browse/SPARK-14919
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 1.6.1
> Environment: Linux, OSX
>Reporter: John Ferguson
>
> When using Spark 1.4.x or Spark 1.6.1 in an application that has a front end 
> requiring jackson-databind 2.6+, we see the follow exceptions:
> Subset of stack trace:
> ==
> com.fasterxml.jackson.databind.JsonMappingException: Could not find creator 
> property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
>  at [Source: {"id":"0","name":"textFile"}; line: 1, column: 1]
>   at 
> com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
>   at 
> com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
>   at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:405)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:354)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:262)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:242)
>   at 
> com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
>   at 
> com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3664)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3556)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2576)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:85)
>   at 
> org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:136)
>   at scala.Option.map(Option.scala:145)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1011)
>   at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:832)
>   at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:830)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>   at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>   at org.apache.spark.SparkContext.textFile(SparkContext.scala:830)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13567) Document spark.shuffle.reduceLocality.enabled configuration

2016-02-29 Thread Lior Chaga (JIRA)
Lior Chaga created SPARK-13567:
--

 Summary: Document spark.shuffle.reduceLocality.enabled 
configuration
 Key: SPARK-13567
 URL: https://issues.apache.org/jira/browse/SPARK-13567
 Project: Spark
  Issue Type: Documentation
  Components: Shuffle
Affects Versions: 1.6.0
Reporter: Lior Chaga


spark.shuffle.reduceLocality.enabled should be documented in configuration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13567) Document spark.shuffle.reduceLocality.enabled configuration

2016-02-29 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15171887#comment-15171887
 ] 

Lior Chaga commented on SPARK-13567:


Well, given my findings (see SPARK-10567), I think it should, at least until 
the issue is resolved.

> Document spark.shuffle.reduceLocality.enabled configuration
> ---
>
> Key: SPARK-13567
> URL: https://issues.apache.org/jira/browse/SPARK-13567
> Project: Spark
>  Issue Type: Documentation
>  Components: Shuffle
>Affects Versions: 1.6.0
>Reporter: Lior Chaga
>Priority: Minor
>
> spark.shuffle.reduceLocality.enabled should be documented in configuration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10567) Reducer locality follow-up for Spark 1.6

2016-02-29 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15171879#comment-15171879
 ] 

Lior Chaga commented on SPARK-10567:


Hi,
I have a spark job that does the following:

Given a list of days (usually 1 day), extract from an index table in cassandra 
a list of file keys, and then repartition with HashPartitioner according to 
file name.
Then it flat maps each filename with a function that extract the relevant file 
from cassandra (stored as blob), and breaks it into protostuff messages.

With spark 1.4 the tasks were distributed beautifully among executors, but 
after upgrading to 1.6 I noticed that most of the tasks are offered to a single 
executor. 
After digging a lot I found this ticket, and by changing 
spark.shuffle.reduceLocality.enabled to false the tasks were evenly distributed 
between executors again.

I think this ticket should be reopened.
Also opened a ticket on missing documentation -SPARK-13567

> Reducer locality follow-up for Spark 1.6
> 
>
> Key: SPARK-10567
> URL: https://issues.apache.org/jira/browse/SPARK-10567
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Reporter: Yin Huai
>Assignee: Matei Zaharia
>Priority: Blocker
> Fix For: 1.6.0
>
>
> For Spark 1.6, let's check the issue mentioned in 
> https://github.com/apache/spark/pull/8280 is fixed when 
> spark.shuffle.reduceLocality.enabled is true.
> Otherwise, we should disable it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12836) spark enable both driver run executor & write to HDFS

2016-03-16 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197120#comment-15197120
 ] 

Lior Chaga commented on SPARK-12836:


I used the --no-switch_user mesos config, and it worked. Writing to hadoop was 
with HADOOP_USER_NAME, while spark executors were running with the mesos-slave 
user permissions.

> spark enable both driver run executor & write to HDFS
> -
>
> Key: SPARK-12836
> URL: https://issues.apache.org/jira/browse/SPARK-12836
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos, Scheduler, Spark Core
>Affects Versions: 1.6.0
> Environment: HADOOP_USER_NAME=qhstats
> SPARK_USER=root
>Reporter: astralidea
>  Labels: features
>
> when spark set env HADOOP_USER_NAME CoarseMesosSchedulerBackend will set 
> sparkuser from this env, but in my cluster run spark must be root, write HDFS 
> must set HADOOP_USER_NAME, need a configuration set run executor by root & 
> write hdfs by another.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21795) Broadcast hint ignored when dataframe is cached

2017-08-22 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136891#comment-16136891
 ] 

Lior Chaga commented on SPARK-21795:


Hi, what I say is that if I use a hint to broadcast join on a DF, I expect to 
use BroadcastHashJoin, even if this DF was previously cached (or if not, then 
it should be documented that broadcast doesn't work with cached DF).

My 2nd claim is that one might have multiple queries in his spark sessions, and 
dataframes may be reused in different queries. So it's not entirely impossible 
that one would like to benefit from caching a DF in one query, and broadcast 
this DF in another unrelated query. But this is just a general statement, 
personally I don't have such a use case.



> Broadcast hint ignored when dataframe is cached
> ---
>
> Key: SPARK-21795
> URL: https://issues.apache.org/jira/browse/SPARK-21795
> Project: Spark
>  Issue Type: Question
>  Components: Documentation, SQL
>Affects Versions: 2.2.0
>Reporter: Lior Chaga
>Priority: Minor
>
> Not sure if it's a bug or by design, but if a DF is cached, the broadcast 
> hint is ignored, and spark uses SortMergeJoin.
> {code}
> val largeDf = ...
> val smalDf = ...
> smallDf = smallDf.cache
> largeDf.join(broadcast(smallDf))
> {code}
> It make sense there's no need to use cache when using broadcast join, 
> however, I wonder if it's the correct behavior for spark to ignore the 
> broadcast hint just because the DF is cached. Consider a case when a DF 
> should be cached for several queries, and on different queries it should be 
> broadcasted.
> If this is the correct behavior, at least it's worth documenting that cached 
> DF cannot be broadcasted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21795) Broadcast hint ignored when dataframe is cached

2017-08-20 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-21795:
---
Description: 
Not sure if it's a bug or by design, but if a DF is cached, the broadcast hint 
is ignored, and spark uses SortMergeJoin.

{code}
val largeDf = ...
val smalDf = ...
smallDf = smallDf.cache

largeDf.join(broadcast(smallDf))

{code}

It make sense there's no need to use cache when using broadcast join, however, 
I wonder if it's the correct behavior for spark to ignore the broadcast hint 
just because the DF is cached. Consider a case when a DF should be cached for 
several queries, and on different queries it should be broadcasted.

If this is the correct behavior, at least it's worth documenting that cached DF 
cannot be broadcasted.

  was:
Not sure if it's a bug or by design, but if a DF is cached, the broadcast hint 
is ignored, and spark uses SortMergeJoin.

{{code}}
val largeDf = ...
val smalDf = ...
smallDf = smallDf.cache

largeDf.join(broadcast(smallDf))

{{code}}

It make sense there's no need to use cache when using broadcast join, however, 
I wonder if it's the correct behavior for spark to ignore the broadcast hint 
just because the DF is cached. Consider a case when a DF should be cached for 
several queries, and on different queries it should be broadcasted.

If this is the correct behavior, at least it's worth documenting that cached DF 
cannot be broadcasted.


> Broadcast hint ignored when dataframe is cached
> ---
>
> Key: SPARK-21795
> URL: https://issues.apache.org/jira/browse/SPARK-21795
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Lior Chaga
>Priority: Minor
>
> Not sure if it's a bug or by design, but if a DF is cached, the broadcast 
> hint is ignored, and spark uses SortMergeJoin.
> {code}
> val largeDf = ...
> val smalDf = ...
> smallDf = smallDf.cache
> largeDf.join(broadcast(smallDf))
> {code}
> It make sense there's no need to use cache when using broadcast join, 
> however, I wonder if it's the correct behavior for spark to ignore the 
> broadcast hint just because the DF is cached. Consider a case when a DF 
> should be cached for several queries, and on different queries it should be 
> broadcasted.
> If this is the correct behavior, at least it's worth documenting that cached 
> DF cannot be broadcasted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21795) Broadcast hint ignored when dataframe is cached

2017-08-20 Thread Lior Chaga (JIRA)
Lior Chaga created SPARK-21795:
--

 Summary: Broadcast hint ignored when dataframe is cached
 Key: SPARK-21795
 URL: https://issues.apache.org/jira/browse/SPARK-21795
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Lior Chaga
Priority: Minor


Not sure if it's a bug or by design, but if a DF is cached, the broadcast hint 
is ignored, and spark uses SortMergeJoin.

{{code}}
val largeDf = ...
val smalDf = ...
smallDf = smallDf.cache

largeDf.join(broadcast(smallDf))

{{code}}

It make sense there's no need to use cache when using broadcast join, however, 
I wonder if it's the correct behavior for spark to ignore the broadcast hint 
just because the DF is cached. Consider a case when a DF should be cached for 
several queries, and on different queries it should be broadcasted.

If this is the correct behavior, at least it's worth documenting that cached DF 
cannot be broadcasted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22840) Incorrect results when using distinct on window

2017-12-19 Thread Lior Chaga (JIRA)
Lior Chaga created SPARK-22840:
--

 Summary: Incorrect results when using distinct on window
 Key: SPARK-22840
 URL: https://issues.apache.org/jira/browse/SPARK-22840
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Lior Chaga


Given the following schema:
{code}
root
 |-- id: string (nullable = true)
 |-- start_time: long (nullable = true)
 |-- stats: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- calibratedRecsHistory: double (nullable = true)
 |||-- eventTime: long (nullable = true)
 |||-- itemId: long (nullable = true)
 |||-- recsHistory: long (nullable = true)
{code}

Data contains multiple rows per id and start_time, with all stats elements for 
a specific id and start_time is identical in all rows, I've noticed 
inconsistent results when using window with FIRST(stats) DESC, and LAST(stats) 
ASC.
Specifically, the latter (LAST with ASC) produces more results.

This is the query for seeing that:

{code}
SELECT DISTINCT
id ,
LAST(stats) over w
FROM sample
WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
except
SELECT DISTINCT
id ,
FIRST(stats) over w 
FROM sample
WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
{code}

Each of the subqueries should return the stats for the latest start_time, 
partitioned by id.
Changing the order of the subqueries returns nothing...
The query with FIRST and ASC produces correct results.

the data for sample is attached in sample.parquet.zip



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22840) Incorrect results when using distinct on window

2017-12-19 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-22840:
---
Description: 
Given the following schema:
{code}
root
 |-- id: string (nullable = true)
 |-- start_time: long (nullable = true)
 |-- stats: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- calibratedRecsHistory: double (nullable = true)
 |||-- eventTime: long (nullable = true)
 |||-- itemId: long (nullable = true)
 |||-- recsHistory: long (nullable = true)
{code}

Data contains multiple rows per id and start_time, with all stats elements for 
a specific id and start_time is identical in all rows, I've noticed 
inconsistent results when using window with FIRST(stats) DESC, and LAST(stats) 
ASC.
Specifically, the latter (LAST with ASC) produces more results.

This is the query for seeing that:

{code}
SELECT DISTINCT
id ,
LAST(stats) over w
FROM sample
WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
except
SELECT DISTINCT
id ,
FIRST(stats) over w 
FROM sample
WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
{code}

Each of the subqueries should return the stats for the latest start_time, 
partitioned by id.
Changing the order of the subqueries returns nothing...
The query with FIRST and ASC produces correct results.

the data for sample is attached in [^sample.parquet.zip]

  was:
Given the following schema:
{code}
root
 |-- id: string (nullable = true)
 |-- start_time: long (nullable = true)
 |-- stats: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- calibratedRecsHistory: double (nullable = true)
 |||-- eventTime: long (nullable = true)
 |||-- itemId: long (nullable = true)
 |||-- recsHistory: long (nullable = true)
{code}

Data contains multiple rows per id and start_time, with all stats elements for 
a specific id and start_time is identical in all rows, I've noticed 
inconsistent results when using window with FIRST(stats) DESC, and LAST(stats) 
ASC.
Specifically, the latter (LAST with ASC) produces more results.

This is the query for seeing that:

{code}
SELECT DISTINCT
id ,
LAST(stats) over w
FROM sample
WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
except
SELECT DISTINCT
id ,
FIRST(stats) over w 
FROM sample
WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
{code}

Each of the subqueries should return the stats for the latest start_time, 
partitioned by id.
Changing the order of the subqueries returns nothing...
The query with FIRST and ASC produces correct results.

the data for sample is attached in sample.parquet.zip


> Incorrect results when using distinct on window
> ---
>
> Key: SPARK-22840
> URL: https://issues.apache.org/jira/browse/SPARK-22840
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Lior Chaga
> Attachments: sample.parquet.zip
>
>
> Given the following schema:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- start_time: long (nullable = true)
>  |-- stats: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- calibratedRecsHistory: double (nullable = true)
>  |||-- eventTime: long (nullable = true)
>  |||-- itemId: long (nullable = true)
>  |||-- recsHistory: long (nullable = true)
> {code}
> Data contains multiple rows per id and start_time, with all stats elements 
> for a specific id and start_time is identical in all rows, I've noticed 
> inconsistent results when using window with FIRST(stats) DESC, and 
> LAST(stats) ASC.
> Specifically, the latter (LAST with ASC) produces more results.
> This is the query for seeing that:
> {code}
> SELECT DISTINCT
> id ,
> LAST(stats) over w
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
> except
> SELECT DISTINCT
> id ,
> FIRST(stats) over w 
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
> {code}
> Each of the subqueries should return the stats for the latest start_time, 
> partitioned by id.
> Changing the order of the subqueries returns nothing...
> The query with FIRST and ASC produces correct results.
> the data for sample is attached in [^sample.parquet.zip]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22840) Incorrect results when using distinct on window

2017-12-19 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-22840:
---
Attachment: sample.parquet.zip

> Incorrect results when using distinct on window
> ---
>
> Key: SPARK-22840
> URL: https://issues.apache.org/jira/browse/SPARK-22840
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Lior Chaga
> Attachments: sample.parquet.zip
>
>
> Given the following schema:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- start_time: long (nullable = true)
>  |-- stats: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- calibratedRecsHistory: double (nullable = true)
>  |||-- eventTime: long (nullable = true)
>  |||-- itemId: long (nullable = true)
>  |||-- recsHistory: long (nullable = true)
> {code}
> Data contains multiple rows per id and start_time, with all stats elements 
> for a specific id and start_time is identical in all rows, I've noticed 
> inconsistent results when using window with FIRST(stats) DESC, and 
> LAST(stats) ASC.
> Specifically, the latter (LAST with ASC) produces more results.
> This is the query for seeing that:
> {code}
> SELECT DISTINCT
> id ,
> LAST(stats) over w
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
> except
> SELECT DISTINCT
> id ,
> FIRST(stats) over w 
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
> {code}
> Each of the subqueries should return the stats for the latest start_time, 
> partitioned by id.
> Changing the order of the subqueries returns nothing...
> The query with FIRST and ASC produces correct results.
> the data for sample is attached in sample.parquet.zip



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22840) Incorrect results when using distinct on window

2017-12-25 Thread Lior Chaga (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303284#comment-16303284
 ] 

Lior Chaga commented on SPARK-22840:


gotcha [~greenhat], thanks

> Incorrect results when using distinct on window
> ---
>
> Key: SPARK-22840
> URL: https://issues.apache.org/jira/browse/SPARK-22840
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Lior Chaga
> Attachments: sample.parquet.zip
>
>
> Given the following schema:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- start_time: long (nullable = true)
>  |-- stats: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- calibratedRecsHistory: double (nullable = true)
>  |||-- eventTime: long (nullable = true)
>  |||-- itemId: long (nullable = true)
>  |||-- recsHistory: long (nullable = true)
> {code}
> Data contains multiple rows per id and start_time, with all stats elements 
> for a specific id and start_time is identical in all rows, I've noticed 
> inconsistent results when using window with FIRST(stats) DESC, and 
> LAST(stats) ASC.
> Specifically, the latter (LAST with ASC) produces more results.
> This is the query for seeing that:
> {code}
> SELECT DISTINCT
> id ,
> LAST(stats) over w
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
> except
> SELECT DISTINCT
> id ,
> FIRST(stats) over w 
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
> {code}
> Each of the subqueries should return the stats for the latest start_time, 
> partitioned by id.
> Changing the order of the subqueries returns nothing...
> The query with FIRST and ASC produces correct results.
> the data for sample is attached in [^sample.parquet.zip]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22840) Incorrect results when using distinct on window

2017-12-25 Thread Lior Chaga (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga resolved SPARK-22840.

Resolution: Not A Bug

> Incorrect results when using distinct on window
> ---
>
> Key: SPARK-22840
> URL: https://issues.apache.org/jira/browse/SPARK-22840
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Lior Chaga
> Attachments: sample.parquet.zip
>
>
> Given the following schema:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- start_time: long (nullable = true)
>  |-- stats: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- calibratedRecsHistory: double (nullable = true)
>  |||-- eventTime: long (nullable = true)
>  |||-- itemId: long (nullable = true)
>  |||-- recsHistory: long (nullable = true)
> {code}
> Data contains multiple rows per id and start_time, with all stats elements 
> for a specific id and start_time is identical in all rows, I've noticed 
> inconsistent results when using window with FIRST(stats) DESC, and 
> LAST(stats) ASC.
> Specifically, the latter (LAST with ASC) produces more results.
> This is the query for seeing that:
> {code}
> SELECT DISTINCT
> id ,
> LAST(stats) over w
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time DESC)
> except
> SELECT DISTINCT
> id ,
> FIRST(stats) over w 
> FROM sample
> WINDOW w AS (PARTITION BY id  SORT BY start_time ASC)
> {code}
> Each of the subqueries should return the stats for the latest start_time, 
> partitioned by id.
> Changing the order of the subqueries returns nothing...
> The query with FIRST and ASC produces correct results.
> the data for sample is attached in [^sample.parquet.zip]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation

2020-01-01 Thread Lior Chaga (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006345#comment-17006345
 ] 

Lior Chaga commented on SPARK-24906:


[~habren] 
The suggested approach of using a multiplier will be good for rather simple 
schemas. 
It will not work however for very complex schemas which include repeated fields 
(as the amount of repetitions cannot be predicted). 

I suggest an alternative approach that would provide better estimation for 
complex schemas, but would require format specific implementation. 
For parquet, for instance, we could sample the metadata of several row groups 
(sample ratio may be configurable), and use the column size vs total size of 
rowgroup to get a rather accurate estimation of the data to be read. 
As much as the variance in metadata is lower between different row groups of 
the data, a smaller sample ratio may be used. 

> Adaptively set split size for columnar file to ensure the task read data size 
> fit expectation
> -
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Major
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation

2020-01-02 Thread Lior Chaga (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006691#comment-17006691
 ] 

Lior Chaga commented on SPARK-24906:


Looking at the PR, I see only using configurable estimations for Struct, Map 
and Array types, not something that is based on row group or any sample method 
of real data. 
Is there an additional development effort on this ticket?
I'd love to try and take part with my limited scala skills.

Anyway, not every heuristic is suitable for every use case. Rough estimations 
like in the attached PR are not good for complex schemas or even data sources 
that have many null values in participating columns. 
On the other hand, using sample of metadata has a higher cost, and might be an 
overkill for the naive cases.
Perhaps solution might be introducing a pluggable estimation strategy, 
providing the naive implementation but allowing spark users to provide their 
own strategy?

> Adaptively set split size for columnar file to ensure the task read data size 
> fit expectation
> -
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Major
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35508) job group and description do not apply on broadcasts

2021-05-25 Thread Lior Chaga (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-35508:
---
Description: 
Given the following code:
{code:java}
SparkContext context = new SparkContext("local", "test"); 
SparkSession session = new SparkSession(context); 
List strings = Lists.newArrayList("a", "b", "c"); 
List otherString = Lists.newArrayList( "b", "c", "d"); 
Dataset broadcastedDf = session.createDataset(strings, 
Encoders.STRING()).toDF(); 
Dataset dataframe = session.createDataset(otherString, 
Encoders.STRING()).toDF(); 
context.setJobGroup("my group", "my job", false); 
dataframe.join(broadcast(broadcastedDf), "value").count();

{code}
Job group and description do not apply on broadcasted dataframe. 
With spark 2.x, broadcast creation is given the same job description as the 
query itself. 
This seems to be broken with spark 3.x

See attached images
 !spark3-image.png!  !spark2-image.png! 

  was:
Given the following code:
{code:java}
SparkContext context = new SparkContext("local", "test"); 
SparkSession session = new SparkSession(context); 
List strings = Lists.newArrayList("a", "b", "c"); 
List otherString = Lists.newArrayList( "b", "c", "d"); 
Dataset broadcastedDf = session.createDataset(strings, 
Encoders.STRING()).toDF(); 
Dataset dataframe = session.createDataset(otherString, 
Encoders.STRING()).toDF(); 
context.setJobGroup("my group", "my job", false); 
dataframe.join(broadcast(broadcastedDf), "value").count();

{code}
Job group and description do not apply on broadcasted dataframe. 
With spark 2.x, broadcast creation is given the same job description as the 
query itself. 
This seems to be broken with spark 3.x

See attached images

!image-2021-05-25-09-39-36-816.png!

!image-2021-05-25-09-40-12-210.png!


> job group and description do not apply on broadcasts
> 
>
> Key: SPARK-35508
> URL: https://issues.apache.org/jira/browse/SPARK-35508
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Lior Chaga
>Priority: Minor
> Attachments: spark2-image.png, spark3-image.png
>
>
> Given the following code:
> {code:java}
> SparkContext context = new SparkContext("local", "test"); 
> SparkSession session = new SparkSession(context); 
> List strings = Lists.newArrayList("a", "b", "c"); 
> List otherString = Lists.newArrayList( "b", "c", "d"); 
> Dataset broadcastedDf = session.createDataset(strings, 
> Encoders.STRING()).toDF(); 
> Dataset dataframe = session.createDataset(otherString, 
> Encoders.STRING()).toDF(); 
> context.setJobGroup("my group", "my job", false); 
> dataframe.join(broadcast(broadcastedDf), "value").count();
> {code}
> Job group and description do not apply on broadcasted dataframe. 
> With spark 2.x, broadcast creation is given the same job description as the 
> query itself. 
> This seems to be broken with spark 3.x
> See attached images
>  !spark3-image.png!  !spark2-image.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35508) job group and description do not apply on broadcasts

2021-05-25 Thread Lior Chaga (Jira)
Lior Chaga created SPARK-35508:
--

 Summary: job group and description do not apply on broadcasts
 Key: SPARK-35508
 URL: https://issues.apache.org/jira/browse/SPARK-35508
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.0, 3.0.0
Reporter: Lior Chaga


Given the following code:
{code:java}
SparkContext context = new SparkContext("local", "test"); 
SparkSession session = new SparkSession(context); 
List strings = Lists.newArrayList("a", "b", "c"); 
List otherString = Lists.newArrayList( "b", "c", "d"); 
Dataset broadcastedDf = session.createDataset(strings, 
Encoders.STRING()).toDF(); 
Dataset dataframe = session.createDataset(otherString, 
Encoders.STRING()).toDF(); 
context.setJobGroup("my group", "my job", false); 
dataframe.join(broadcast(broadcastedDf), "value").count();

{code}
Job group and description do not apply on broadcasted dataframe. 
With spark 2.x, broadcast creation is given the same job description as the 
query itself. 
This seems to be broken with spark 3.x

See attached images

!image-2021-05-25-09-39-36-816.png!

!image-2021-05-25-09-40-12-210.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35508) job group and description do not apply on broadcasts

2021-05-25 Thread Lior Chaga (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-35508:
---
Attachment: spark2-image.png

> job group and description do not apply on broadcasts
> 
>
> Key: SPARK-35508
> URL: https://issues.apache.org/jira/browse/SPARK-35508
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Lior Chaga
>Priority: Minor
> Attachments: spark2-image.png, spark3-image.png
>
>
> Given the following code:
> {code:java}
> SparkContext context = new SparkContext("local", "test"); 
> SparkSession session = new SparkSession(context); 
> List strings = Lists.newArrayList("a", "b", "c"); 
> List otherString = Lists.newArrayList( "b", "c", "d"); 
> Dataset broadcastedDf = session.createDataset(strings, 
> Encoders.STRING()).toDF(); 
> Dataset dataframe = session.createDataset(otherString, 
> Encoders.STRING()).toDF(); 
> context.setJobGroup("my group", "my job", false); 
> dataframe.join(broadcast(broadcastedDf), "value").count();
> {code}
> Job group and description do not apply on broadcasted dataframe. 
> With spark 2.x, broadcast creation is given the same job description as the 
> query itself. 
> This seems to be broken with spark 3.x
> See attached images
> !image-2021-05-25-09-39-36-816.png!
> !image-2021-05-25-09-40-12-210.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35508) job group and description do not apply on broadcasts

2021-05-25 Thread Lior Chaga (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-35508:
---
Attachment: spark3-image.png

> job group and description do not apply on broadcasts
> 
>
> Key: SPARK-35508
> URL: https://issues.apache.org/jira/browse/SPARK-35508
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Lior Chaga
>Priority: Minor
> Attachments: spark2-image.png, spark3-image.png
>
>
> Given the following code:
> {code:java}
> SparkContext context = new SparkContext("local", "test"); 
> SparkSession session = new SparkSession(context); 
> List strings = Lists.newArrayList("a", "b", "c"); 
> List otherString = Lists.newArrayList( "b", "c", "d"); 
> Dataset broadcastedDf = session.createDataset(strings, 
> Encoders.STRING()).toDF(); 
> Dataset dataframe = session.createDataset(otherString, 
> Encoders.STRING()).toDF(); 
> context.setJobGroup("my group", "my job", false); 
> dataframe.join(broadcast(broadcastedDf), "value").count();
> {code}
> Job group and description do not apply on broadcasted dataframe. 
> With spark 2.x, broadcast creation is given the same job description as the 
> query itself. 
> This seems to be broken with spark 3.x
> See attached images
> !image-2021-05-25-09-39-36-816.png!
> !image-2021-05-25-09-40-12-210.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35508) job group and description do not apply on broadcasts

2021-05-25 Thread Lior Chaga (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lior Chaga updated SPARK-35508:
---
Description: 
Given the following code:
{code:java}
SparkContext context = new SparkContext("local", "test"); 
SparkSession session = new SparkSession(context); 

List strings = Lists.newArrayList("a", "b", "c"); 
List otherString = Lists.newArrayList( "b", "c", "d"); 

Dataset broadcastedDf = session.createDataset(strings, 
Encoders.STRING()).toDF(); 
Dataset dataframe = session.createDataset(otherString, 
Encoders.STRING()).toDF(); 

context.setJobGroup("my group", "my job", false); 
dataframe.join(broadcast(broadcastedDf), "value").count();

{code}
Job group and description do not apply on broadcasted dataframe. 
With spark 2.x, broadcast creation is given the same job description as the 
query itself. 
This seems to be broken with spark 3.x

See attached images
 !spark3-image.png!  !spark2-image.png! 

  was:
Given the following code:
{code:java}
SparkContext context = new SparkContext("local", "test"); 
SparkSession session = new SparkSession(context); 
List strings = Lists.newArrayList("a", "b", "c"); 
List otherString = Lists.newArrayList( "b", "c", "d"); 
Dataset broadcastedDf = session.createDataset(strings, 
Encoders.STRING()).toDF(); 
Dataset dataframe = session.createDataset(otherString, 
Encoders.STRING()).toDF(); 
context.setJobGroup("my group", "my job", false); 
dataframe.join(broadcast(broadcastedDf), "value").count();

{code}
Job group and description do not apply on broadcasted dataframe. 
With spark 2.x, broadcast creation is given the same job description as the 
query itself. 
This seems to be broken with spark 3.x

See attached images
 !spark3-image.png!  !spark2-image.png! 


> job group and description do not apply on broadcasts
> 
>
> Key: SPARK-35508
> URL: https://issues.apache.org/jira/browse/SPARK-35508
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0, 3.1.0
>Reporter: Lior Chaga
>Priority: Minor
> Attachments: spark2-image.png, spark3-image.png
>
>
> Given the following code:
> {code:java}
> SparkContext context = new SparkContext("local", "test"); 
> SparkSession session = new SparkSession(context); 
> List strings = Lists.newArrayList("a", "b", "c"); 
> List otherString = Lists.newArrayList( "b", "c", "d"); 
> Dataset broadcastedDf = session.createDataset(strings, 
> Encoders.STRING()).toDF(); 
> Dataset dataframe = session.createDataset(otherString, 
> Encoders.STRING()).toDF(); 
> context.setJobGroup("my group", "my job", false); 
> dataframe.join(broadcast(broadcastedDf), "value").count();
> {code}
> Job group and description do not apply on broadcasted dataframe. 
> With spark 2.x, broadcast creation is given the same job description as the 
> query itself. 
> This seems to be broken with spark 3.x
> See attached images
>  !spark3-image.png!  !spark2-image.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org