[jira] [Assigned] (SPARK-26780) Improve shuffle read using ReadAheadInputStream

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26780:


Assignee: Apache Spark

> Improve  shuffle read using ReadAheadInputStream 
> -
>
> Key: SPARK-26780
> URL: https://issues.apache.org/jira/browse/SPARK-26780
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: liuxian
>Assignee: Apache Spark
>Priority: Major
>
> Using _ReadAheadInputStream_  to improve  shuffle read  performance.
>  _ReadAheadInputStream_ can save cpu utilization and almost no performance 
> regression



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26780) Improve shuffle read using ReadAheadInputStream

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26780:


Assignee: (was: Apache Spark)

> Improve  shuffle read using ReadAheadInputStream 
> -
>
> Key: SPARK-26780
> URL: https://issues.apache.org/jira/browse/SPARK-26780
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: liuxian
>Priority: Major
>
> Using _ReadAheadInputStream_  to improve  shuffle read  performance.
>  _ReadAheadInputStream_ can save cpu utilization and almost no performance 
> regression



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26780) Improve shuffle read using ReadAheadInputStream

2019-01-29 Thread liuxian (JIRA)
liuxian created SPARK-26780:
---

 Summary: Improve  shuffle read using ReadAheadInputStream 
 Key: SPARK-26780
 URL: https://issues.apache.org/jira/browse/SPARK-26780
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.0.0
Reporter: liuxian


Using _ReadAheadInputStream_  to improve  shuffle read  performance.
 _ReadAheadInputStream_ can save cpu utilization and almost no performance 
regression



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24360) Support Hive 3.1 metastore

2019-01-29 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755770#comment-16755770
 ] 

Dongjoon Hyun commented on SPARK-24360:
---

I made a new PR for HMS 3.1.

> Support Hive 3.1 metastore
> --
>
> Key: SPARK-24360
> URL: https://issues.apache.org/jira/browse/SPARK-24360
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Priority: Major
>
> Hive 3.1.0 is released. This issue aims to support Hive Metastore 3.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread huanghuai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762
 ] 

huanghuai edited comment on SPARK-25420 at 1/30/19 7:13 AM:


---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
 spark.conf().set("spark.sql.shuffle.partitions", 3);
 spark.conf().set("spark.default.parallelism", 3);
 List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
 );
 StructType schema1 = new StructType(
 new StructField[]

{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new 
StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new 
StructField("address", DataTypes.StringType, true, Metadata.empty()), }

);

for (int i = 0; i < 5; i++)

{ Dataset dropDuplicates = spark.createDataFrame(data1, schema1)

.repartition(2) //  when you set repartition is 1,2,3 everytime you can see 
the difference in show method.

.dropDuplicates("age");

dropDuplicates.show();

dropDuplicates = dropDuplicates.filter("address like 'sz%'");

System.out.println("count=" + dropDuplicates.count());

//I find another problem: filter("address='sz2').show() ,the result have two 
record

// but if I use:filter("address='sz2').count() , the count is 0

}

--- It is not a bug it self, just a use problem, or 
else.--


was (Author: zhiyin1233):
---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
 spark.conf().set("spark.sql.shuffle.partitions", 3);
 spark.conf().set("spark.default.parallelism", 3);
 List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
 );
 StructType schema1 = new StructType(
 new StructField[]

{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new 
StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new 
StructField("address", DataTypes.StringType, true, Metadata.empty()), }

);

for (int i = 0; i < 5; i++)

{ Dataset dropDuplicates = spark.createDataFrame(data1, schema1) 
.repartition(2) //  when you set repartition is 1,2,3 everytime you can see 
the difference in show method. .dropDuplicates("age"); dropDuplicates.show(); 
dropDuplicates = dropDuplicates.filter("address like 'sz%'"); 
System.out.println("count=" + dropDuplicates.count()); //I find another 
problem: filter("address='sz2').show() ,the result have two record // but if I 
use:filter("address='sz2').count() , the count is 0 }

--- It is not a bug it self, just a use problem, or 
else.--

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> 

[jira] [Resolved] (SPARK-26378) Queries of wide CSV/JSON data slowed after SPARK-26151

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26378.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23336
[https://github.com/apache/spark/pull/23336]

> Queries of wide CSV/JSON data slowed after SPARK-26151
> --
>
> Key: SPARK-26378
> URL: https://issues.apache.org/jira/browse/SPARK-26378
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Assignee: Bruce Robbins
>Priority: Major
> Fix For: 3.0.0
>
>
> A recent change significantly slowed the queries of wide CSV tables. For 
> example, queries against a 6000 column table slowed by 45-48% when queried 
> with a single executor.
>   
>  The [PR for 
> SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2]
>  changed FailureSafeParser#toResultRow such that the returned function 
> recreates every row, even when the associated input record has no parsing 
> issues and the user specified no corrupt record field in his/her schema. This 
> extra processing is responsible for the slowdown.
> The change to FailureSafeParser also impacted queries of wide JSON tables as 
> well.
>  I propose that a row should be recreated only if there is a parsing error or 
> columns need to be shifted due to the existence of a corrupt column field in 
> the user-supplied schema. Otherwise, the row should be used as-is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26378) Queries of wide CSV/JSON data slowed after SPARK-26151

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-26378:


Assignee: Bruce Robbins

> Queries of wide CSV/JSON data slowed after SPARK-26151
> --
>
> Key: SPARK-26378
> URL: https://issues.apache.org/jira/browse/SPARK-26378
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Assignee: Bruce Robbins
>Priority: Major
>
> A recent change significantly slowed the queries of wide CSV tables. For 
> example, queries against a 6000 column table slowed by 45-48% when queried 
> with a single executor.
>   
>  The [PR for 
> SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2]
>  changed FailureSafeParser#toResultRow such that the returned function 
> recreates every row, even when the associated input record has no parsing 
> issues and the user specified no corrupt record field in his/her schema. This 
> extra processing is responsible for the slowdown.
> The change to FailureSafeParser also impacted queries of wide JSON tables as 
> well.
>  I propose that a row should be recreated only if there is a parsing error or 
> columns need to be shifted due to the existence of a corrupt column field in 
> the user-supplied schema. Otherwise, the row should be used as-is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread huanghuai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762
 ] 

huanghuai edited comment on SPARK-25420 at 1/30/19 7:11 AM:


---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
 spark.conf().set("spark.sql.shuffle.partitions", 3);
 spark.conf().set("spark.default.parallelism", 3);
 List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
 );
 StructType schema1 = new StructType(
 new StructField[]

{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new 
StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new 
StructField("address", DataTypes.StringType, true, Metadata.empty()), }

);

for (int i = 0; i < 5; i++)

{ Dataset dropDuplicates = spark.createDataFrame(data1, schema1) 
.repartition(2) //  when you set repartition is 1,2,3 everytime you can see 
the difference in show method. .dropDuplicates("age"); dropDuplicates.show(); 
dropDuplicates = dropDuplicates.filter("address like 'sz%'"); 
System.out.println("count=" + dropDuplicates.count()); //I find another 
problem: filter("address='sz2').show() ,the result have two record // but if I 
use:filter("address='sz2').count() , the count is 0 }

--- It is not a bug it self, just a use problem, or 
else.--


was (Author: zhiyin1233):
---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
 spark.conf().set("spark.sql.shuffle.partitions", 3);
 spark.conf().set("spark.default.parallelism", 3);
 List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
 );
 StructType schema1 = new StructType(
 new StructField[]{
 new StructField("name", DataTypes.StringType, true, Metadata.empty()),
 new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
 new StructField("address", DataTypes.StringType, true, Metadata.empty()),
 });


 for (int i = 0; i < 5; i++) {
 Dataset dropDuplicates = spark.createDataFrame(data1, schema1)
 .repartition(2) //  when you set repartition is 1,2,3 everytime you can 
see the difference in show method.
 .dropDuplicates("dropDuplicates");
 dropDuplicates.show();
 dropDuplicates = dropDuplicates.filter("address like 'sz%'");
 System.out.println("count=" + dropDuplicates.count());
 //I find another problem: filter("address='sz2').show() ,the result have two 
record
 // but if I use:filter("address='sz2').count() , the count is 0
 }

--- It is not a bug it self, just a use problem, or 
else.--

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> 

[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread huanghuai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762
 ] 

huanghuai edited comment on SPARK-25420 at 1/30/19 7:06 AM:


---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
 spark.conf().set("spark.sql.shuffle.partitions", 3);
 spark.conf().set("spark.default.parallelism", 3);
 List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
 );
 StructType schema1 = new StructType(
 new StructField[]{
 new StructField("name", DataTypes.StringType, true, Metadata.empty()),
 new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
 new StructField("address", DataTypes.StringType, true, Metadata.empty()),
 });


 for (int i = 0; i < 5; i++) {
 Dataset dropDuplicates = spark.createDataFrame(data1, schema1)
 .repartition(2) //  when you set repartition is 1,2,3 everytime you can 
see the difference in show method.
 .dropDuplicates("dropDuplicates");
 dropDuplicates.show();
 dropDuplicates = dropDuplicates.filter("address like 'sz%'");
 System.out.println("count=" + dropDuplicates.count());
 //I find another problem: filter("address='sz2').show() ,the result have two 
record
 // but if I use:filter("address='sz2').count() , the count is 0
 }

--- It is not a bug it self, just a use problem, or 
else.--


was (Author: zhiyin1233):
---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
spark.conf().set("spark.sql.shuffle.partitions", 3);
spark.conf().set("spark.default.parallelism", 3);
List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
);
StructType schema1 = new StructType(
 new StructField[]{
 new StructField("name", DataTypes.StringType, true, Metadata.empty()),
 new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
 new StructField("address", DataTypes.StringType, true, Metadata.empty()),
 });


for (int i = 0; i < 5; i++) {
 Dataset dropDuplicates = spark.createDataFrame(data1, schema1)
 .repartition(2) //  when you set repartition is 1,2,3 everytime you can 
see the difference in show method.
 .dropDuplicates("dropDuplicates");
 dropDuplicates.show();
 dropDuplicates = dropDuplicates.filter("address like 'sz%'");
 System.out.println("count=" + dropDuplicates.count());
 //I find another problem: filter("address='sz2').show() ,the result have two 
record
 // but if I use:filter("address='sz2').count() , the count is 0
}

 

 

 

--- It is not a bug it self, just a use problem, or 
else.--

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> 

[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread huanghuai (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762
 ] 

huanghuai commented on SPARK-25420:
---

---  the code to be  reproduced 
--

SparkSession spark = 
SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate();
spark.conf().set("spark.sql.shuffle.partitions", 3);
spark.conf().set("spark.default.parallelism", 3);
List data1 = Arrays.asList(
 RowFactory.create("jack", 22, "gz"),
 RowFactory.create("lisi", 22, "sh"),
 RowFactory.create("wangwu", 22, "tj"),
 RowFactory.create("leo", 21, "alibaba"),
 RowFactory.create("a", 22, "sz"),
 RowFactory.create("b", 22, "sz1"),
 RowFactory.create("c", 22, "sz2"),
 RowFactory.create("d", 22, "sz3"),
 RowFactory.create("mic", 22, "bc")
);
StructType schema1 = new StructType(
 new StructField[]{
 new StructField("name", DataTypes.StringType, true, Metadata.empty()),
 new StructField("age", DataTypes.IntegerType, true, Metadata.empty()),
 new StructField("address", DataTypes.StringType, true, Metadata.empty()),
 });


for (int i = 0; i < 5; i++) {
 Dataset dropDuplicates = spark.createDataFrame(data1, schema1)
 .repartition(2) //  when you set repartition is 1,2,3 everytime you can 
see the difference in show method.
 .dropDuplicates("dropDuplicates");
 dropDuplicates.show();
 dropDuplicates = dropDuplicates.filter("address like 'sz%'");
 System.out.println("count=" + dropDuplicates.count());
 //I find another problem: filter("address='sz2').show() ,the result have two 
record
 // but if I use:filter("address='sz2').count() , the count is 0
}

 

 

 

--- It is not a bug it self, just a use problem, or 
else.--

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> --The above is code 
> ---
>  
>  
> console output:
> source count=459275
> dropDuplicates count1=453987
> dropDuplicates count2=453987
> filter count1=445798
> filter count2=445797
> filter count3=445797
> filter count4=445798
> filter count5=445799
>  
> question:
>  
> Why is filter.count() different everytime?
> if I remove dropDuplicates() everything will be ok!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26768:


Assignee: Apache Spark

> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Assignee: Apache Spark
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I 
> found that there are useless code that would never reach. The related codes 
> is as below:
>  
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
>   if (blockId.isShuffle) {
> 
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
>   } else {
> getLocalBytes(blockId) match {
>   case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
> true)
>   case None =>
> // If this block manager receives a request for a block that it 
> doesn't have then it's
> // likely that the master has outdated block statuses for this block. 
> Therefore, we send
> // an RPC so that this block is marked as being unavailable from this 
> block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
>   }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
>   logDebug(s"Getting local block $blockId as bytes")
>   // As an optimization for map output fetches, if the block is for a 
> shuffle, return it
>   // without acquiring a lock; the disk store never deletes (recent) items so 
> this should work
>   if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not 
> available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
>   
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
>   } else {
> blockInfoManager.lockForReading(blockId).map { info => 
> doGetLocalBytes(blockId, info) }
>   }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the 
> method calling hierarchy of `BlockManager.getLocalBytes`, the another 
> callsite of the `BlockManager.getLocalBytes` is at 
> `TorrentBroadcast.readBlocks` where the blockId can never be a 
> `ShuffleBlockId`.
>   !Selection_037.jpg!
> So I think we should remove these useless code for easy reading.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26768:


Assignee: (was: Apache Spark)

> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I 
> found that there are useless code that would never reach. The related codes 
> is as below:
>  
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
>   if (blockId.isShuffle) {
> 
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
>   } else {
> getLocalBytes(blockId) match {
>   case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
> true)
>   case None =>
> // If this block manager receives a request for a block that it 
> doesn't have then it's
> // likely that the master has outdated block statuses for this block. 
> Therefore, we send
> // an RPC so that this block is marked as being unavailable from this 
> block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
>   }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
>   logDebug(s"Getting local block $blockId as bytes")
>   // As an optimization for map output fetches, if the block is for a 
> shuffle, return it
>   // without acquiring a lock; the disk store never deletes (recent) items so 
> this should work
>   if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not 
> available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
>   
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
>   } else {
> blockInfoManager.lockForReading(blockId).map { info => 
> doGetLocalBytes(blockId, info) }
>   }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the 
> method calling hierarchy of `BlockManager.getLocalBytes`, the another 
> callsite of the `BlockManager.getLocalBytes` is at 
> `TorrentBroadcast.readBlocks` where the blockId can never be a 
> `ShuffleBlockId`.
>   !Selection_037.jpg!
> So I think we should remove these useless code for easy reading.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26749) spark streaming kafka verison for high version

2019-01-29 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755758#comment-16755758
 ] 

Hyukjin Kwon commented on SPARK-26749:
--

https://spark.apache.org/community.html

> spark streaming kafka verison  for high version
> ---
>
> Key: SPARK-26749
> URL: https://issues.apache.org/jira/browse/SPARK-26749
> Project: Spark
>  Issue Type: Question
>  Components: Java API
>Affects Versions: 2.2.0
>Reporter: Chang Quanyou
>Priority: Major
>
> when I use `spark-streaming-Kafka-0-10_2.11` to consume Kafka Topic, the 
> inner version is :
> kafka-clients:0.10.0.1, it means that the inner Kafka version is  0.1.0.0.1 ; 
> before our Kafka cluster is 0.10.1, But now we will update the Kafka cluster 
> to 1.0, but the inner dependency is also 0.10.1 ;  and read the document  
> which said support 0.10.0 or higher version, It means that the Kafka client 
> does not match Kafka cluster version, I tried, No issue, But I think it's not 
> safe, what could I do ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26699) Dataset column output discrepancies

2019-01-29 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755756#comment-16755756
 ] 

Hyukjin Kwon commented on SPARK-26699:
--

Please take a look at https://spark.apache.org/community.html

> Dataset column output discrepancies 
> 
>
> Key: SPARK-26699
> URL: https://issues.apache.org/jira/browse/SPARK-26699
> Project: Spark
>  Issue Type: Question
>  Components: Input/Output
>Affects Versions: 2.3.2
>Reporter: Praveena
>Priority: Major
>
> Hi,
>  
> When i run my job in Local mode (meaning as standalone in Eclipse) with same 
> parquet input files, the output is -
>  
> locations
>  
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  null
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  [[[true, [[, phys...
>  
> But when i run the same code base with same input parquet files in the YARN 
> cluster mode, my output is as below -
> 
>  locations
>  
>  [*WrappedArray*([tr...
>  [*WrappedArray*([tr...
>  [WrappedArray([tr...
>  null
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
>  [WrappedArray([tr...
> Its appending WrappedArray :(
> I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What 
> could be the reason for discrepancies in the output of certain Table columns ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755757#comment-16755757
 ] 

Jungtaek Lim commented on SPARK-25420:
--

[~jeffrey.mak]
Hmm... the result looks odd but also not easy to investigate unless we can 
narrow down smaller dataset of reproducer.

[~mgaido] WDYT about Jeffrey's case?

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> --The above is code 
> ---
>  
>  
> console output:
> source count=459275
> dropDuplicates count1=453987
> dropDuplicates count2=453987
> filter count1=445798
> filter count2=445797
> filter count3=445797
> filter count4=445798
> filter count5=445799
>  
> question:
>  
> Why is filter.count() different everytime?
> if I remove dropDuplicates() everything will be ok!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26777) SQL worked in 2.3.2 and fails in 2.4.0

2019-01-29 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755749#comment-16755749
 ] 

Hyukjin Kwon commented on SPARK-26777:
--

Please narrow down the problem, and describe your analysis. self-contained 
reproducer if possible. It sounds like just requesting investigations.
Please reopen after filling sufficient narrowed-down analysis, and 
self-contained reproducer.

> SQL worked in 2.3.2 and fails in 2.4.0
> --
>
> Key: SPARK-26777
> URL: https://issues.apache.org/jira/browse/SPARK-26777
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Yuri Budilov
>Priority: Major
>
> Following SQL worked in Spark 2.3.2 and now fails on 2.4.0 (AWS EMR Spark)
>  PySpark call below:
> spark.sql("select partition_year_utc,partition_month_utc,partition_day_utc \
> from datalake_reporting.copy_of_leads_notification \
> where partition_year_utc = (select max(partition_year_utc) from 
> datalake_reporting.copy_of_leads_notification) \
> and partition_month_utc = \
>  (select max(partition_month_utc) from 
> datalake_reporting.copy_of_leads_notification as m \
>  where \
>  m.partition_year_utc = (select max(partition_year_utc) from 
> datalake_reporting.copy_of_leads_notification)) \
>  and partition_day_utc = (select max(d.partition_day_utc) from 
> datalake_reporting.copy_of_leads_notification as d \
>  where d.partition_month_utc = \
>  (select max(m1.partition_month_utc) from 
> datalake_reporting.copy_of_leads_notification as m1 \
>  where m1.partition_year_utc = \
>  (select max(y.partition_year_utc) from 
> datalake_reporting.copy_of_leads_notification as y) \
>  ) \
>  ) \
>  order by 1 desc, 2 desc, 3 desc limit 1 ").show(1,False)
> Error: (no need for data, this is syntax).
> py4j.protocol.Py4JJavaError: An error occurred while calling o1326.showString.
> : java.lang.UnsupportedOperationException: Cannot evaluate expression: 
> scalar-subquery#4495 []
>  
> Note: all 3 columns in query are Partitioned columns - see bottom of the 
> schema)
>  
> Hive EMR AWS Schema is:
>  
> CREATE EXTERNAL TABLE `copy_of_leads_notification`(
> `message.environment.siteorigin` string, `dcpheader.dcploaddateutc` string, 
> `message.id` int, `source.properties._country` string, `message.created` 
> string, `dcpheader.generatedmessageid` string, `message.tags` bigint, 
> `source.properties._enqueuedtimeutc` string, `source.properties._leadtype` 
> string, `message.itemid` string, `message.prospect.postcode` string, 
> `message.prospect.email` string, `message.referenceid` string, 
> `message.item.year` string, `message.identifier` string, 
> `dcpheader.dcploadmonthutc` string, `message.processed` string, 
> `source.properties._tenant` string, `message.item.price` string, 
> `message.subscription.confirmresponse` boolean, `message.itemtype` string, 
> `message.prospect.lastname` string, `message.subscription.insurancequote` 
> boolean, `source.exchangename` string, 
> `message.prospect.identificationnumbers` bigint, 
> `message.environment.ipaddress` string, `dcpheader.dcploaddayutc` string, 
> `source.properties._itemtype` string, `source.properties._requesttype` 
> string, `message.item.make` string, `message.prospect.firstname` string, 
> `message.subscription.survey` boolean, `message.prospect.homephone` string, 
> `message.extendedproperties` bigint, `message.subscription.financequote` 
> boolean, `message.uniqueidentifier` string, `source.properties._id` string, 
> `dcpheader.sourcemessageguid` string, `message.requesttype` string, 
> `source.routingkey` string, `message.service` string, `message.item.model` 
> string, `message.environment.pagesource` string, `source.source` string, 
> `message.sellerid` string, `partition_date_utc` string, 
> `message.selleridentifier` string, `message.subscription.newsletter` boolean, 
> `dcpheader.dcploadyearutc` string, `message.leadtype` string, 
> `message.history` bigint, `message.callconnect.calloutcome` string, 
> `message.callconnect.datecreatedutc` string, 
> `message.callconnect.callrecordingurl` string, 
> `message.callconnect.transferoutcome` string, 
> `message.callconnect.hiderecording` boolean, 
> `message.callconnect.callstartutc` string, `message.callconnect.code` string, 
> `message.callconnect.callduration` string, `message.fraudnetinfo` string, 
> `message.callconnect.answernumber` string, `message.environment.sourcedevice` 
> string, `message.comments` string, `message.fraudinfo.servervariables` 
> bigint, `message.callconnect.servicenumber` string, 
> `message.callconnect.callid` string, `message.callconnect.voicemailurl` 
> string, `message.item.stocknumber` string, 
> `message.callconnect.answerduration` string, `message.callconnect.callendutc` 
> string, `message.item.series` 

[jira] [Resolved] (SPARK-26777) SQL worked in 2.3.2 and fails in 2.4.0

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26777.
--
Resolution: Incomplete

> SQL worked in 2.3.2 and fails in 2.4.0
> --
>
> Key: SPARK-26777
> URL: https://issues.apache.org/jira/browse/SPARK-26777
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Yuri Budilov
>Priority: Major
>
> Following SQL worked in Spark 2.3.2 and now fails on 2.4.0 (AWS EMR Spark)
>  PySpark call below:
> spark.sql("select partition_year_utc,partition_month_utc,partition_day_utc \
> from datalake_reporting.copy_of_leads_notification \
> where partition_year_utc = (select max(partition_year_utc) from 
> datalake_reporting.copy_of_leads_notification) \
> and partition_month_utc = \
>  (select max(partition_month_utc) from 
> datalake_reporting.copy_of_leads_notification as m \
>  where \
>  m.partition_year_utc = (select max(partition_year_utc) from 
> datalake_reporting.copy_of_leads_notification)) \
>  and partition_day_utc = (select max(d.partition_day_utc) from 
> datalake_reporting.copy_of_leads_notification as d \
>  where d.partition_month_utc = \
>  (select max(m1.partition_month_utc) from 
> datalake_reporting.copy_of_leads_notification as m1 \
>  where m1.partition_year_utc = \
>  (select max(y.partition_year_utc) from 
> datalake_reporting.copy_of_leads_notification as y) \
>  ) \
>  ) \
>  order by 1 desc, 2 desc, 3 desc limit 1 ").show(1,False)
> Error: (no need for data, this is syntax).
> py4j.protocol.Py4JJavaError: An error occurred while calling o1326.showString.
> : java.lang.UnsupportedOperationException: Cannot evaluate expression: 
> scalar-subquery#4495 []
>  
> Note: all 3 columns in query are Partitioned columns - see bottom of the 
> schema)
>  
> Hive EMR AWS Schema is:
>  
> CREATE EXTERNAL TABLE `copy_of_leads_notification`(
> `message.environment.siteorigin` string, `dcpheader.dcploaddateutc` string, 
> `message.id` int, `source.properties._country` string, `message.created` 
> string, `dcpheader.generatedmessageid` string, `message.tags` bigint, 
> `source.properties._enqueuedtimeutc` string, `source.properties._leadtype` 
> string, `message.itemid` string, `message.prospect.postcode` string, 
> `message.prospect.email` string, `message.referenceid` string, 
> `message.item.year` string, `message.identifier` string, 
> `dcpheader.dcploadmonthutc` string, `message.processed` string, 
> `source.properties._tenant` string, `message.item.price` string, 
> `message.subscription.confirmresponse` boolean, `message.itemtype` string, 
> `message.prospect.lastname` string, `message.subscription.insurancequote` 
> boolean, `source.exchangename` string, 
> `message.prospect.identificationnumbers` bigint, 
> `message.environment.ipaddress` string, `dcpheader.dcploaddayutc` string, 
> `source.properties._itemtype` string, `source.properties._requesttype` 
> string, `message.item.make` string, `message.prospect.firstname` string, 
> `message.subscription.survey` boolean, `message.prospect.homephone` string, 
> `message.extendedproperties` bigint, `message.subscription.financequote` 
> boolean, `message.uniqueidentifier` string, `source.properties._id` string, 
> `dcpheader.sourcemessageguid` string, `message.requesttype` string, 
> `source.routingkey` string, `message.service` string, `message.item.model` 
> string, `message.environment.pagesource` string, `source.source` string, 
> `message.sellerid` string, `partition_date_utc` string, 
> `message.selleridentifier` string, `message.subscription.newsletter` boolean, 
> `dcpheader.dcploadyearutc` string, `message.leadtype` string, 
> `message.history` bigint, `message.callconnect.calloutcome` string, 
> `message.callconnect.datecreatedutc` string, 
> `message.callconnect.callrecordingurl` string, 
> `message.callconnect.transferoutcome` string, 
> `message.callconnect.hiderecording` boolean, 
> `message.callconnect.callstartutc` string, `message.callconnect.code` string, 
> `message.callconnect.callduration` string, `message.fraudnetinfo` string, 
> `message.callconnect.answernumber` string, `message.environment.sourcedevice` 
> string, `message.comments` string, `message.fraudinfo.servervariables` 
> bigint, `message.callconnect.servicenumber` string, 
> `message.callconnect.callid` string, `message.callconnect.voicemailurl` 
> string, `message.item.stocknumber` string, 
> `message.callconnect.answerduration` string, `message.callconnect.callendutc` 
> string, `message.item.series` string, `message.item.detailsurl` string, 
> `message.item.pricetype` string, `message.item.description` string, 
> `message.item.colour` string, `message.item.badge` string, 
> `message.item.odometer` string, `message.environment.requestheader` string, 
> 

[jira] [Resolved] (SPARK-26779) NullPointerException when disable wholestage codegen

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26779.
--
Resolution: Incomplete

Please reopen this after filling sufficient details requested

> NullPointerException when disable wholestage codegen
> 
>
> Key: SPARK-26779
> URL: https://issues.apache.org/jira/browse/SPARK-26779
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Trivial
>
> When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9:
> java.lang.NullPointerException at
> org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170)
>  at 
> org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613)
>  at 
> org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> 

[jira] [Commented] (SPARK-26779) NullPointerException when disable wholestage codegen

2019-01-29 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755746#comment-16755746
 ] 

Hyukjin Kwon commented on SPARK-26779:
--

Please just don't copy and paste. Describe your analysis, if it is persistent 
or not, how you reproduce this (step by step).

> NullPointerException when disable wholestage codegen
> 
>
> Key: SPARK-26779
> URL: https://issues.apache.org/jira/browse/SPARK-26779
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Trivial
>
> When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9:
> java.lang.NullPointerException at
> org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170)
>  at 
> org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613)
>  at 
> org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.immutable.List.map(List.scala:285) at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
>  at 
> 

[jira] [Created] (SPARK-26779) NullPointerException when disable wholestage codegen

2019-01-29 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-26779:
-

 Summary: NullPointerException when disable wholestage codegen
 Key: SPARK-26779
 URL: https://issues.apache.org/jira/browse/SPARK-26779
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Xiaoju Wu


When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9:

java.lang.NullPointerException at

org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 

[jira] [Created] (SPARK-26778) Remove rule `FallbackOrcDataSourceV2` when catalog support of file data source v2 is finished

2019-01-29 Thread Gengliang Wang (JIRA)
Gengliang Wang created SPARK-26778:
--

 Summary: Remove rule `FallbackOrcDataSourceV2` when catalog 
support of file data source v2 is finished
 Key: SPARK-26778
 URL: https://issues.apache.org/jira/browse/SPARK-26778
 Project: Spark
  Issue Type: Task
  Components: SQL
Affects Versions: 3.0.0
Reporter: Gengliang Wang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread Jeffrey (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755709#comment-16755709
 ] 

Jeffrey edited comment on SPARK-25420 at 1/30/19 5:59 AM:
--

[~kabhwan] I cannot share the dataset since it is owned by my clients. I could 
elaborate more on the scenarios:

 
 >>drkcard_0_df = 
 >>spark.read.csv("""[wasbs://e...@okprodstorage.blob.core.windows.net/AAA/WICTW/raw/TXN/POS/2018/09/**/*.gz]""")
  
 The dataset carries > 100,000 of records.
  
 >>drkcard_0_df.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
 >>_c2='83' and _c4='180919212732008200218'").show(2000,False)
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|Tom|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|James|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence|

 
 >>dropDup_0=drkcard_0_df.dropDuplicates(["_c0","_c1","_c2","_c3","_c4"])
  
 Running the where for 1st time: {color:#ff}Bug: Group C was mistakenly 
dropped.{color}
 >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
 >>_c2='83' and _c4='180919212732008200218'").show(2000,False)
  
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary|

 
 Running the where again: {color:#ff}Acceptable result{color}
  
  >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
_c2='83' and _c4='180919212732008200218'").show(2000,False)
  
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence|

 
 Running the where again:{color:#ff}Bug: Group A and C were mistakenly 
dropped.{color}
  >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
_c2='83' and _c4='180919212732008200218'").show(2000,False)
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|

 

Continue to repeat running the where statement, the number of rows keep 
changing.


was (Author: jeffrey.mak):
[~kabhwan] I cannot share the dataset since it is owned by my clients. I could 
elaborate more on the scenarios:

 
>>drkcard_0_df = 
>>spark.read.csv("""[wasbs://e...@okprodstorage.blob.core.windows.net/AAA/WICTW/raw/TXN/POS/2018/09/**/*.gz|wasbs://l...@aswprodeastorage.blob.core.windows.net/ASW/WTCTW/raw/TlogParser/PARSED_TLOG/2018/09/**/*.gz]""")
 
The dataset carries > 100,000 of records.
 
>>drkcard_0_df.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
>>_c2='83' and _c4='180919212732008200218'").show(2000,False)
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|Tom|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|James|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence|

 
>>dropDup_0=drkcard_0_df.dropDuplicates(["_c0","_c1","_c2","_c3","_c4"])
 
Running the where for 1st time: {color:#FF}Bug: Group C was mistakenly 
dropped.{color}
>>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' 
>>and _c4='180919212732008200218'").show(2000,False)
 
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary|
 
Running the where again: {color:#FF}Acceptable result{color}
 
 >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
_c2='83' and _c4='180919212732008200218'").show(2000,False)
 
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence|
 
Running the where again:{color:#FF}Bug: Group A and C were mistakenly 
dropped.{color}
 >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
_c2='83' and _c4='180919212732008200218'").show(2000,False)
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|

 

Continue to repeat running the where statement, the number of rows keep 
changing.

> Dataset.count()  every time is different.
> -
>
> Key: 

[jira] [Updated] (SPARK-26778) Implement file source V2 partitioning

2019-01-29 Thread Gengliang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-26778:
---
Summary: Implement file source V2 partitioning   (was: Remove rule 
`FallbackOrcDataSourceV2` when catalog support of file data source v2 is 
finished)

> Implement file source V2 partitioning 
> --
>
> Key: SPARK-26778
> URL: https://issues.apache.org/jira/browse/SPARK-26778
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread Jeffrey (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755709#comment-16755709
 ] 

Jeffrey commented on SPARK-25420:
-

[~kabhwan] I cannot share the dataset since it is owned by my clients. I could 
elaborate more on the scenarios:

 
>>drkcard_0_df = 
>>spark.read.csv("""[wasbs://e...@okprodstorage.blob.core.windows.net/AAA/WICTW/raw/TXN/POS/2018/09/**/*.gz|wasbs://l...@aswprodeastorage.blob.core.windows.net/ASW/WTCTW/raw/TlogParser/PARSED_TLOG/2018/09/**/*.gz]""")
 
The dataset carries > 100,000 of records.
 
>>drkcard_0_df.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
>>_c2='83' and _c4='180919212732008200218'").show(2000,False)
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|Tom|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|James|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence|

 
>>dropDup_0=drkcard_0_df.dropDuplicates(["_c0","_c1","_c2","_c3","_c4"])
 
Running the where for 1st time: {color:#FF}Bug: Group C was mistakenly 
dropped.{color}
>>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' 
>>and _c4='180919212732008200218'").show(2000,False)
 
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary|
 
Running the where again: {color:#FF}Acceptable result{color}
 
 >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
_c2='83' and _c4='180919212732008200218'").show(2000,False)
 
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence|
 
Running the where again:{color:#FF}Bug: Group A and C were mistakenly 
dropped.{color}
 >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and 
_c2='83' and _c4='180919212732008200218'").show(2000,False)
|_c0|_c1|_c2|_c3|_c4|_c5|
|2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel|

 

Continue to repeat running the where statement, the number of rows keep 
changing.

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> --The above is code 
> ---
>  
>  
> console output:
> source count=459275
> dropDuplicates count1=453987
> dropDuplicates count2=453987
> filter count1=445798
> filter count2=445797
> filter count3=445797
> filter count4=445798
> filter count5=445799
>  
> question:
>  
> Why is filter.count() different everytime?
> if I remove dropDuplicates() everything will be ok!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26777) SQL worked in 2.3.2 and fails in 2.4.0

2019-01-29 Thread Yuri Budilov (JIRA)
Yuri Budilov created SPARK-26777:


 Summary: SQL worked in 2.3.2 and fails in 2.4.0
 Key: SPARK-26777
 URL: https://issues.apache.org/jira/browse/SPARK-26777
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Yuri Budilov


Following SQL worked in Spark 2.3.2 and now fails on 2.4.0 (AWS EMR Spark)

 PySpark call below:

spark.sql("select partition_year_utc,partition_month_utc,partition_day_utc \
from datalake_reporting.copy_of_leads_notification \
where partition_year_utc = (select max(partition_year_utc) from 
datalake_reporting.copy_of_leads_notification) \
and partition_month_utc = \
 (select max(partition_month_utc) from 
datalake_reporting.copy_of_leads_notification as m \
 where \
 m.partition_year_utc = (select max(partition_year_utc) from 
datalake_reporting.copy_of_leads_notification)) \
 and partition_day_utc = (select max(d.partition_day_utc) from 
datalake_reporting.copy_of_leads_notification as d \
 where d.partition_month_utc = \
 (select max(m1.partition_month_utc) from 
datalake_reporting.copy_of_leads_notification as m1 \
 where m1.partition_year_utc = \
 (select max(y.partition_year_utc) from 
datalake_reporting.copy_of_leads_notification as y) \
 ) \
 ) \
 order by 1 desc, 2 desc, 3 desc limit 1 ").show(1,False)

Error: (no need for data, this is syntax).
py4j.protocol.Py4JJavaError: An error occurred while calling o1326.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
scalar-subquery#4495 []

 

Note: all 3 columns in query are Partitioned columns - see bottom of the schema)

 

Hive EMR AWS Schema is:

 

CREATE EXTERNAL TABLE `copy_of_leads_notification`(

`message.environment.siteorigin` string, `dcpheader.dcploaddateutc` string, 
`message.id` int, `source.properties._country` string, `message.created` 
string, `dcpheader.generatedmessageid` string, `message.tags` bigint, 
`source.properties._enqueuedtimeutc` string, `source.properties._leadtype` 
string, `message.itemid` string, `message.prospect.postcode` string, 
`message.prospect.email` string, `message.referenceid` string, 
`message.item.year` string, `message.identifier` string, 
`dcpheader.dcploadmonthutc` string, `message.processed` string, 
`source.properties._tenant` string, `message.item.price` string, 
`message.subscription.confirmresponse` boolean, `message.itemtype` string, 
`message.prospect.lastname` string, `message.subscription.insurancequote` 
boolean, `source.exchangename` string, `message.prospect.identificationnumbers` 
bigint, `message.environment.ipaddress` string, `dcpheader.dcploaddayutc` 
string, `source.properties._itemtype` string, `source.properties._requesttype` 
string, `message.item.make` string, `message.prospect.firstname` string, 
`message.subscription.survey` boolean, `message.prospect.homephone` string, 
`message.extendedproperties` bigint, `message.subscription.financequote` 
boolean, `message.uniqueidentifier` string, `source.properties._id` string, 
`dcpheader.sourcemessageguid` string, `message.requesttype` string, 
`source.routingkey` string, `message.service` string, `message.item.model` 
string, `message.environment.pagesource` string, `source.source` string, 
`message.sellerid` string, `partition_date_utc` string, 
`message.selleridentifier` string, `message.subscription.newsletter` boolean, 
`dcpheader.dcploadyearutc` string, `message.leadtype` string, `message.history` 
bigint, `message.callconnect.calloutcome` string, 
`message.callconnect.datecreatedutc` string, 
`message.callconnect.callrecordingurl` string, 
`message.callconnect.transferoutcome` string, 
`message.callconnect.hiderecording` boolean, `message.callconnect.callstartutc` 
string, `message.callconnect.code` string, `message.callconnect.callduration` 
string, `message.fraudnetinfo` string, `message.callconnect.answernumber` 
string, `message.environment.sourcedevice` string, `message.comments` string, 
`message.fraudinfo.servervariables` bigint, `message.callconnect.servicenumber` 
string, `message.callconnect.callid` string, `message.callconnect.voicemailurl` 
string, `message.item.stocknumber` string, `message.callconnect.answerduration` 
string, `message.callconnect.callendutc` string, `message.item.series` string, 
`message.item.detailsurl` string, `message.item.pricetype` string, 
`message.item.description` string, `message.item.colour` string, 
`message.item.badge` string, `message.item.odometer` string, 
`message.environment.requestheader` string, `message.item.registrationnumber` 
string, `message.item.bodytype` string, `message.item.fueltype` string, 
`message.item.redbookcode` string, `message.item.spotid` string, 
`message.item.id` string, `message.item.transmission` string, 
`message.item.vin` string, `message.item.enginedescription` string, 
`message.prospect.mobilephone` string, 

[jira] [Assigned] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26776:
---

Assignee: Hyukjin Kwon

> Reduce Py4J communication cost in PySpark's execution barrier check
> ---
>
> Key: SPARK-26776
> URL: https://issues.apache.org/jira/browse/SPARK-26776
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Minor
>
> I am investigating flaky tests. I realised that:
> {code}
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2512, in __init__
> self.is_barrier = prev._is_barrier() or isFromBarrier
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2412, in _is_barrier
> return self._jrdd.rdd().isBarrier()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 342, in get_return_value
> return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 2492, in 
> lambda target_id, gateway_client: JavaObject(target_id, 
> gateway_client))
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1324, in __init__
> ThreadSafeFinalizer.add_finalizer(key, value)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
>  line 43, in add_finalizer
> cls.finalizers[id] = weak_ref
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
> __exit__
> self.release()
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
> release
> self.__block.release()
> error: release unlocked lock
> {code}
> I assume it might not be directly related with the test itself but I noticed 
> that it prev._is_barrier() attempts to access via Py4J.
> Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26776.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23690
[https://github.com/apache/spark/pull/23690]

> Reduce Py4J communication cost in PySpark's execution barrier check
> ---
>
> Key: SPARK-26776
> URL: https://issues.apache.org/jira/browse/SPARK-26776
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 3.0.0
>
>
> I am investigating flaky tests. I realised that:
> {code}
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2512, in __init__
> self.is_barrier = prev._is_barrier() or isFromBarrier
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2412, in _is_barrier
> return self._jrdd.rdd().isBarrier()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 342, in get_return_value
> return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 2492, in 
> lambda target_id, gateway_client: JavaObject(target_id, 
> gateway_client))
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1324, in __init__
> ThreadSafeFinalizer.add_finalizer(key, value)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
>  line 43, in add_finalizer
> cls.finalizers[id] = weak_ref
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
> __exit__
> self.release()
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
> release
> self.__block.release()
> error: release unlocked lock
> {code}
> I assume it might not be directly related with the test itself but I noticed 
> that it prev._is_barrier() attempts to access via Py4J.
> Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26726) Synchronize the amount of memory used by the broadcast variable to the UI display

2019-01-29 Thread hantiantian (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hantiantian updated SPARK-26726:

Summary:   Synchronize the amount of memory used by the broadcast variable 
to the UI display  (was:  The amount of memory used by the broadcast variable 
is not synchronized to the UI display)

>   Synchronize the amount of memory used by the broadcast variable to the UI 
> display
> ---
>
> Key: SPARK-26726
> URL: https://issues.apache.org/jira/browse/SPARK-26726
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: hantiantian
>Priority: Major
>
> The amount of memory used by the broadcast variable is not synchronized to 
> the UI display,
> spark-sql>  select /*+ broadcast(a)*/ a.id,b.id from a join b on a.id = b.id;
> View the app's driver log:
> 2019-01-25 16:45:23,726 INFO org.apache.spark.storage.BlockManagerInfo: Added 
> broadcast_4_piece0 in memory on 10.43.xx.xx:33907 (size: 6.6 KB, free: 2.5 GB)
>  2019-01-25 16:45:23,727 INFO org.apache.spark.storage.BlockManagerInfo: 
> Added broadcast_4_piece0 in memory on 10.43.xx.xx:38399 (size: 6.6 KB, free: 
> 2.5 GB)
>  2019-01-25 16:45:23,745 INFO org.apache.spark.storage.BlockManagerInfo: 
> Added broadcast_3_piece0 in memory on 10.43.xx.xx:33907 (size: 32.1 KB, free: 
> 2.5 GB)
>  2019-01-25 16:45:23,749 INFO org.apache.spark.storage.BlockManagerInfo: 
> Added broadcast_3_piece0 in memory on 10.43.xx.xx:38399 (size: 32.1 KB, free: 
> 2.5 GB)
>  2019-01-25 16:45:23,838 INFO org.apache.spark.storage.BlockManagerInfo: 
> Added broadcast_2_piece0 in memory on 10.43.xx.xx:38399 (size: 147.0 B, free: 
> 2.5 GB)
>  2019-01-25 16:45:23,840 INFO org.apache.spark.storage.BlockManagerInfo: 
> Added broadcast_2_piece0 in memory on 10.43.xx.xx:33907 (size: 147.0 B, free: 
> 2.5 GB)
>  
> Web UI does not have the use of memory,
> ||Executor ID||Address||Status||RDD Blocks||Storage Memory||Disk 
> Used||Cores||Active Tasks||Failed Tasks||Complete Tasks||Total Tasks||Task 
> Time (GC Time)||Input||Shuffle Read||Shuffle Write||Logs||Thread Dump||
> |0|xxx:38399|Active|0|0.0 B / 2.7 GB|0.0 B|1|0|0|2|2|4 s (0.4 s)|8 B|0.0 
> B|0.0 
> B|[stdout|http://10.43.183.120:18085/logPage/?appId=app-20190125164426-0003=0=stdout]
>  
> [stderr|http://10.43.183.120:18085/logPage/?appId=app-20190125164426-0003=0=stderr]|[Thread
>  Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=0]|
> |driver|xxx:47936|Active|0|0.0 B / 384.1 MB|0.0 B|0|0|0|0|0|0.0 ms (0.0 
> ms)|0.0 B|0.0 B|0.0 B| |[Thread 
> Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=driver]|
> |1|xxx:47414|Active|0|0.0 B / 2.7 GB|0.0 B|1|0|0|0|0|0.0 ms (0.0 ms)|0.0 
> B|0.0 B|0.0 
> B|[stdout|http://10.43.183.121:18085/logPage/?appId=app-20190125164426-0003=1=stdout]
>  
> [stderr|http://10.43.183.121:18085/logPage/?appId=app-20190125164426-0003=1=stderr]|[Thread
>  Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=1]|
> |2|xxx:33907|Active|0|0.0 B / 2.7 GB|0.0 B|1|0|0|2|2|4 s (0.2 s)|4 B|0.0 
> B|0.0 
> B|[stdout|http://10.43.183.122:18085/logPage/?appId=app-20190125164426-0003=2=stdout]
>  
> [stderr|http://10.43.183.122:18085/logPage/?appId=app-20190125164426-0003=2=stderr]|[Thread
>  Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=2]|
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26776:


Assignee: Apache Spark

> Reduce Py4J communication cost in PySpark's execution barrier check
> ---
>
> Key: SPARK-26776
> URL: https://issues.apache.org/jira/browse/SPARK-26776
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Hyukjin Kwon
>Assignee: Apache Spark
>Priority: Minor
>
> I am investigating flaky tests. I realised that:
> {code}
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2512, in __init__
> self.is_barrier = prev._is_barrier() or isFromBarrier
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2412, in _is_barrier
> return self._jrdd.rdd().isBarrier()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 342, in get_return_value
> return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 2492, in 
> lambda target_id, gateway_client: JavaObject(target_id, 
> gateway_client))
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1324, in __init__
> ThreadSafeFinalizer.add_finalizer(key, value)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
>  line 43, in add_finalizer
> cls.finalizers[id] = weak_ref
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
> __exit__
> self.release()
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
> release
> self.__block.release()
> error: release unlocked lock
> {code}
> I assume it might not be directly related with the test itself but I noticed 
> that it prev._is_barrier() attempts to access via Py4J.
> Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-26776:


 Summary: Reduce Py4J communication cost in PySpark's execution 
barrier check
 Key: SPARK-26776
 URL: https://issues.apache.org/jira/browse/SPARK-26776
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.0, 3.0.0
Reporter: Hyukjin Kwon


I am investigating flaky tests. I realised that:

{code}
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
2512, in __init__
self.is_barrier = prev._is_barrier() or isFromBarrier
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
2412, in _is_barrier
return self._jrdd.rdd().isBarrier()
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 342, in get_return_value
return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 2492, in 
lambda target_id, gateway_client: JavaObject(target_id, gateway_client))
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1324, in __init__
ThreadSafeFinalizer.add_finalizer(key, value)
  File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
 line 43, in add_finalizer
cls.finalizers[id] = weak_ref
  File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
__exit__
self.release()
  File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
release
self.__block.release()
error: release unlocked lock
{code}

I assume it might not be directly related with the test itself but I noticed 
that it prev._is_barrier() attempts to access via Py4J.

Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755605#comment-16755605
 ] 

Apache Spark commented on SPARK-26776:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/23690

> Reduce Py4J communication cost in PySpark's execution barrier check
> ---
>
> Key: SPARK-26776
> URL: https://issues.apache.org/jira/browse/SPARK-26776
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> I am investigating flaky tests. I realised that:
> {code}
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2512, in __init__
> self.is_barrier = prev._is_barrier() or isFromBarrier
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2412, in _is_barrier
> return self._jrdd.rdd().isBarrier()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 342, in get_return_value
> return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 2492, in 
> lambda target_id, gateway_client: JavaObject(target_id, 
> gateway_client))
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1324, in __init__
> ThreadSafeFinalizer.add_finalizer(key, value)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
>  line 43, in add_finalizer
> cls.finalizers[id] = weak_ref
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
> __exit__
> self.release()
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
> release
> self.__block.release()
> error: release unlocked lock
> {code}
> I assume it might not be directly related with the test itself but I noticed 
> that it prev._is_barrier() attempts to access via Py4J.
> Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26776:


Assignee: (was: Apache Spark)

> Reduce Py4J communication cost in PySpark's execution barrier check
> ---
>
> Key: SPARK-26776
> URL: https://issues.apache.org/jira/browse/SPARK-26776
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> I am investigating flaky tests. I realised that:
> {code}
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2512, in __init__
> self.is_barrier = prev._is_barrier() or isFromBarrier
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2412, in _is_barrier
> return self._jrdd.rdd().isBarrier()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 342, in get_return_value
> return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 2492, in 
> lambda target_id, gateway_client: JavaObject(target_id, 
> gateway_client))
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1324, in __init__
> ThreadSafeFinalizer.add_finalizer(key, value)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
>  line 43, in add_finalizer
> cls.finalizers[id] = weak_ref
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
> __exit__
> self.release()
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
> release
> self.__block.release()
> error: release unlocked lock
> {code}
> I assume it might not be directly related with the test itself but I noticed 
> that it prev._is_barrier() attempts to access via Py4J.
> Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check

2019-01-29 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755602#comment-16755602
 ] 

Apache Spark commented on SPARK-26776:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/23690

> Reduce Py4J communication cost in PySpark's execution barrier check
> ---
>
> Key: SPARK-26776
> URL: https://issues.apache.org/jira/browse/SPARK-26776
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> I am investigating flaky tests. I realised that:
> {code}
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2512, in __init__
> self.is_barrier = prev._is_barrier() or isFromBarrier
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
> 2412, in _is_barrier
> return self._jrdd.rdd().isBarrier()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 342, in get_return_value
> return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 2492, in 
> lambda target_id, gateway_client: JavaObject(target_id, 
> gateway_client))
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1324, in __init__
> ThreadSafeFinalizer.add_finalizer(key, value)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
>  line 43, in add_finalizer
> cls.finalizers[id] = weak_ref
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in 
> __exit__
> self.release()
>   File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in 
> release
> self.__block.release()
> error: release unlocked lock
> {code}
> I assume it might not be directly related with the test itself but I noticed 
> that it prev._is_barrier() attempts to access via Py4J.
> Accessing via Py4J is expensive and IMHO it makes it flaky.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755583#comment-16755583
 ] 

Jungtaek Lim commented on SPARK-25420:
--

[~jeffrey.mak]
Could you provide some data example and query to reproduce the case? 

The description from issue is clear to "not a bug" assuming the order of rows 
are non-deterministic unless we put sort on it - dropDuplicates will provide 
only first row of the group which is also non-deterministic. 

If you have producible case which doesn't fall into this, please share your 
case.

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> --The above is code 
> ---
>  
>  
> console output:
> source count=459275
> dropDuplicates count1=453987
> dropDuplicates count2=453987
> filter count1=445798
> filter count2=445797
> filter count3=445797
> filter count4=445798
> filter count5=445799
>  
> question:
>  
> Why is filter.count() different everytime?
> if I remove dropDuplicates() everything will be ok!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.

2019-01-29 Thread Jeffrey (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755546#comment-16755546
 ] 

Jeffrey commented on SPARK-25420:
-

 [~mgaido] Could you elaborate more why this is not a bug? 

dropDuplicates() is expected to drop only the rows that is duplicate in a 
dataframe. I experienced the same non-deterministic behavior when doing filter 
after dropDuplicates over a subset of columns. Sometimes, it returns less row 
than I expected. That is, some rows which is not duplicates but also got 
dropped.

 

> Dataset.count()  every time is different.
> -
>
> Key: SPARK-25420
> URL: https://issues.apache.org/jira/browse/SPARK-25420
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
> Environment: spark2.3
> standalone
>Reporter: huanghuai
>Priority: Major
>  Labels: SQL
>
> Dataset dataset = sparkSession.read().format("csv").option("sep", 
> ",").option("inferSchema", "true")
>  .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true")
>  .option("encoding", "UTF-8")
>  .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv");
> System.out.println("source count="+dataset.count());
> Dataset dropDuplicates = dataset.dropDuplicates(new 
> String[]\{"DATE","TIME","VEL","COMPANY"});
> System.out.println("dropDuplicates count1="+dropDuplicates.count());
> System.out.println("dropDuplicates count2="+dropDuplicates.count());
> Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 
> and (status = 0 or status = 1)");
> System.out.println("filter count1="+filter.count());
> System.out.println("filter count2="+filter.count());
> System.out.println("filter count3="+filter.count());
> System.out.println("filter count4="+filter.count());
> System.out.println("filter count5="+filter.count());
>  
>  
> --The above is code 
> ---
>  
>  
> console output:
> source count=459275
> dropDuplicates count1=453987
> dropDuplicates count2=453987
> filter count1=445798
> filter count2=445797
> filter count3=445797
> filter count4=445798
> filter count5=445799
>  
> question:
>  
> Why is filter.count() different everytime?
> if I remove dropDuplicates() everything will be ok!!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26732) Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that actually persist data

2019-01-29 Thread Takeshi Yamamuro (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755515#comment-16755515
 ] 

Takeshi Yamamuro commented on SPARK-26732:
--

Thanks for pinging me, dongjoon!

> Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that 
> actually persist data
> ---
>
> Key: SPARK-26732
> URL: https://issues.apache.org/jira/browse/SPARK-26732
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.3.2, 2.4.0, 3.0.0
>Reporter: Marcelo Vanzin
>Priority: Major
>
> From 
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/5437/testReport/junit/org.apache.spark/SparkContextInfoSuite/getRDDStorageInfo_only_reports_on_RDDs_that_actually_persist_data/:
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: 0 did not equal 1
> Stacktrace
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 did 
> not equal 1
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
>   at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   at 
> org.apache.spark.SparkContextInfoSuite.$anonfun$new$3(SparkContextInfoSuite.scala:63)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens

2019-01-29 Thread Marcelo Vanzin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755501#comment-16755501
 ] 

Marcelo Vanzin commented on SPARK-26766:


The only thing YARN-specific about {{hadoopFSsToAccess}} is the reference to 
{{STAGING_DIR}}. Even {{FILESYSTEMS_TO_ACCESS}} makes sense for other RMs, it 
could just be renamed with a more generic key (e.g. under {{spark.kerberos.}}).

To reduce complexity, I think it should be fine to just move 
{{hadoopFSsToAccess}} to {{HadoopFSDelegationTokenProvider}}. The only drawback 
is looking at {{STAGING_DIR}} outside of YARN; if that's a real concern, just 
add a check in the code that ignores it if the master is not "yarn".

> Remove the list of filesystems from 
> HadoopDelegationTokenProvider.obtainDelegationTokens
> 
>
> Key: SPARK-26766
> URL: https://issues.apache.org/jira/browse/SPARK-26766
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Minor
>
> This was discussed in previous PR 
> [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26732) Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that actually persist data

2019-01-29 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755412#comment-16755412
 ] 

Dongjoon Hyun commented on SPARK-26732:
---

cc [~maropu].

> Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that 
> actually persist data
> ---
>
> Key: SPARK-26732
> URL: https://issues.apache.org/jira/browse/SPARK-26732
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.3.2, 2.4.0, 3.0.0
>Reporter: Marcelo Vanzin
>Priority: Major
>
> From 
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/5437/testReport/junit/org.apache.spark/SparkContextInfoSuite/getRDDStorageInfo_only_reports_on_RDDs_that_actually_persist_data/:
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: 0 did not equal 1
> Stacktrace
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 did 
> not equal 1
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
>   at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   at 
> org.apache.spark.SparkContextInfoSuite.$anonfun$new$3(SparkContextInfoSuite.scala:63)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26732) Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that actually persist data

2019-01-29 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26732:
--
Affects Version/s: 2.3.2
   2.4.0

> Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that 
> actually persist data
> ---
>
> Key: SPARK-26732
> URL: https://issues.apache.org/jira/browse/SPARK-26732
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.3.2, 2.4.0, 3.0.0
>Reporter: Marcelo Vanzin
>Priority: Major
>
> From 
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/5437/testReport/junit/org.apache.spark/SparkContextInfoSuite/getRDDStorageInfo_only_reports_on_RDDs_that_actually_persist_data/:
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: 0 did not equal 1
> Stacktrace
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 did 
> not equal 1
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
>   at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   at 
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   at 
> org.apache.spark.SparkContextInfoSuite.$anonfun$new$3(SparkContextInfoSuite.scala:63)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25035) Replicating disk-stored blocks should avoid memory mapping

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25035:


Assignee: (was: Apache Spark)

> Replicating disk-stored blocks should avoid memory mapping
> --
>
> Key: SPARK-25035
> URL: https://issues.apache.org/jira/browse/SPARK-25035
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Imran Rashid
>Priority: Major
>  Labels: memory-analysis
>
> This is a follow-up to SPARK-24296.
> When replicating a disk-cached block, even if we fetch-to-disk, we still 
> memory-map the file, just to copy it to another location.
> Ideally we'd just move the tmp file to the right location.  But even without 
> that, we could read the file as an input stream, instead of memory-mapping 
> the whole thing.  Memory-mapping is particularly a problem when running under 
> yarn, as the OS may believe there is plenty of memory available, meanwhile 
> yarn decides to kill the process for exceeding memory limits.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25035) Replicating disk-stored blocks should avoid memory mapping

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25035:


Assignee: Apache Spark

> Replicating disk-stored blocks should avoid memory mapping
> --
>
> Key: SPARK-25035
> URL: https://issues.apache.org/jira/browse/SPARK-25035
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Imran Rashid
>Assignee: Apache Spark
>Priority: Major
>  Labels: memory-analysis
>
> This is a follow-up to SPARK-24296.
> When replicating a disk-cached block, even if we fetch-to-disk, we still 
> memory-map the file, just to copy it to another location.
> Ideally we'd just move the tmp file to the right location.  But even without 
> that, we could read the file as an input stream, instead of memory-mapping 
> the whole thing.  Memory-mapping is particularly a problem when running under 
> yarn, as the OS may believe there is plenty of memory available, meanwhile 
> yarn decides to kill the process for exceeding memory limits.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26718) Fixed integer overflow in SS kafka rateLimit calculation

2019-01-29 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26718:
--
Fix Version/s: 2.4.1

> Fixed integer overflow in SS kafka rateLimit calculation
> 
>
> Key: SPARK-26718
> URL: https://issues.apache.org/jira/browse/SPARK-26718
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Ryne Yang
>Assignee: Ryne Yang
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26775) Update Jenkins nodes to support local volumes for K8s integration tests

2019-01-29 Thread Stavros Kontopoulos (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stavros Kontopoulos updated SPARK-26775:

Description: 
Current version of Minikube on test machines does not support properly the 
local persistent volume feature required by this PR: 
[https://github.com/apache/spark/pull/23514].

We get his error:

"spec.local: Forbidden: Local volumes are disabled by feature-gate, 
metadata.annotations: Required value: Local volume requires node affinity"

This is probably due to this: [https://github.com/rancher/rancher/issues/13864] 
which implies that we need to update to 1.10+ as described in 
[https://kubernetes.io/docs/concepts/storage/volumes/#local]. Fabric8io client 
is already updated in the PR mentioned at the beginning.

  was:
Current version of Minikube on test machines does not support properly the 
local persistent volume feature required by this PR: 
https://github.com/apache/spark/pull/23514.

We get his error:

"spec.local: Forbidden: Local volumes are disabled by feature-gate, 
metadata.annotations: Required value: Local volume requires node affinity"

This is probably due to this: [https://github.com/rancher/rancher/issues/13864] 
which implies that we need to update to 1.10+ as described in 
[https://kubernetes.io/docs/concepts/storage/volumes/#loca,l|https://kubernetes.io/docs/concepts/storage/volumes/#local]
 since fabric8io client is already updated in the PR mentioned at the beginning.


> Update Jenkins nodes to support local volumes for K8s integration tests
> ---
>
> Key: SPARK-26775
> URL: https://issues.apache.org/jira/browse/SPARK-26775
> Project: Spark
>  Issue Type: Improvement
>  Components: jenkins, Kubernetes
>Affects Versions: 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> Current version of Minikube on test machines does not support properly the 
> local persistent volume feature required by this PR: 
> [https://github.com/apache/spark/pull/23514].
> We get his error:
> "spec.local: Forbidden: Local volumes are disabled by feature-gate, 
> metadata.annotations: Required value: Local volume requires node affinity"
> This is probably due to this: 
> [https://github.com/rancher/rancher/issues/13864] which implies that we need 
> to update to 1.10+ as described in 
> [https://kubernetes.io/docs/concepts/storage/volumes/#local]. Fabric8io 
> client is already updated in the PR mentioned at the beginning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26775) Update Jenkins nodes to support local volumes for K8s integration tests

2019-01-29 Thread Stavros Kontopoulos (JIRA)
Stavros Kontopoulos created SPARK-26775:
---

 Summary: Update Jenkins nodes to support local volumes for K8s 
integration tests
 Key: SPARK-26775
 URL: https://issues.apache.org/jira/browse/SPARK-26775
 Project: Spark
  Issue Type: Improvement
  Components: jenkins, Kubernetes
Affects Versions: 3.0.0
Reporter: Stavros Kontopoulos


Current version of Minikube on test machines does not support properly the 
local persistent volume feature required by this PR: 
https://github.com/apache/spark/pull/23514.

We get his error:

"spec.local: Forbidden: Local volumes are disabled by feature-gate, 
metadata.annotations: Required value: Local volume requires node affinity"

This is probably due to this: [https://github.com/rancher/rancher/issues/13864] 
which implies that we need to update to 1.10+ as described in 
[https://kubernetes.io/docs/concepts/storage/volumes/#loca,l|https://kubernetes.io/docs/concepts/storage/volumes/#local]
 since fabric8io client is already updated in the PR mentioned at the beginning.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25035) Replicating disk-stored blocks should avoid memory mapping

2019-01-29 Thread Attila Zsolt Piros (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755322#comment-16755322
 ] 

Attila Zsolt Piros commented on SPARK-25035:


I am working on this.

> Replicating disk-stored blocks should avoid memory mapping
> --
>
> Key: SPARK-25035
> URL: https://issues.apache.org/jira/browse/SPARK-25035
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Imran Rashid
>Priority: Major
>  Labels: memory-analysis
>
> This is a follow-up to SPARK-24296.
> When replicating a disk-cached block, even if we fetch-to-disk, we still 
> memory-map the file, just to copy it to another location.
> Ideally we'd just move the tmp file to the right location.  But even without 
> that, we could read the file as an input stream, instead of memory-mapping 
> the whole thing.  Memory-mapping is particularly a problem when running under 
> yarn, as the OS may believe there is plenty of memory available, meanwhile 
> yarn decides to kill the process for exceeding memory limits.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26718) Fixed integer overflow in SS kafka rateLimit calculation

2019-01-29 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26718:
--
Summary: Fixed integer overflow in SS kafka rateLimit calculation  (was: 
structured streaming fetched wrong current offset from kafka)

> Fixed integer overflow in SS kafka rateLimit calculation
> 
>
> Key: SPARK-26718
> URL: https://issues.apache.org/jira/browse/SPARK-26718
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Ryne Yang
>Assignee: Ryne Yang
>Priority: Major
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26718) structured streaming fetched wrong current offset from kafka

2019-01-29 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-26718:
-

Assignee: Ryne Yang

> structured streaming fetched wrong current offset from kafka
> 
>
> Key: SPARK-26718
> URL: https://issues.apache.org/jira/browse/SPARK-26718
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Ryne Yang
>Assignee: Ryne Yang
>Priority: Major
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26718) Fixed integer overflow in SS kafka rateLimit calculation

2019-01-29 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-26718.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

This is resolved via https://github.com/apache/spark/pull/23666

> Fixed integer overflow in SS kafka rateLimit calculation
> 
>
> Key: SPARK-26718
> URL: https://issues.apache.org/jira/browse/SPARK-26718
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Ryne Yang
>Assignee: Ryne Yang
>Priority: Major
> Fix For: 3.0.0
>
>
> when running spark structured streaming using lib: `"org.apache.spark" %% 
> "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current 
> offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): 
> java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
> at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for 
> one of the partitions. I checked the structured streaming checkpoint, that 
> was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % 
> "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 
> partitions. then produced some messages into topic, job crashed and logged 
> the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 
> 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: 
> {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the 
> current available offsets returned from kafka is showing Long.MIN_VALUE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25994) SPIP: Property Graphs, Cypher Queries, and Algorithms

2019-01-29 Thread Saikat Kanjilal (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755272#comment-16755272
 ] 

Saikat Kanjilal commented on SPARK-25994:
-

[~mju] I would like to help out on this issue on the design/implementation, 
thoughts on the best steps to get involved, I will review the doc above as a 
first step

> SPIP: Property Graphs, Cypher Queries, and Algorithms
> -
>
> Key: SPARK-25994
> URL: https://issues.apache.org/jira/browse/SPARK-25994
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Martin Junghanns
>Priority: Major
>  Labels: SPIP
>
> Copied from the SPIP doc:
> {quote}
> GraphX was one of the foundational pillars of the Spark project, and is the 
> current graph component. This reflects the importance of the graphs data 
> model, which naturally pairs with an important class of analytic function, 
> the network or graph algorithm. 
> However, GraphX is not actively maintained. It is based on RDDs, and cannot 
> exploit Spark 2’s Catalyst query engine. GraphX is only available to Scala 
> users.
> GraphFrames is a Spark package, which implements DataFrame-based graph 
> algorithms, and also incorporates simple graph pattern matching with fixed 
> length patterns (called “motifs”). GraphFrames is based on DataFrames, but 
> has a semantically weak graph data model (based on untyped edges and 
> vertices). The motif pattern matching facility is very limited by comparison 
> with the well-established Cypher language. 
> The Property Graph data model has become quite widespread in recent years, 
> and is the primary focus of commercial graph data management and of graph 
> data research, both for on-premises and cloud data management. Many users of 
> transactional graph databases also wish to work with immutable graphs in 
> Spark.
> The idea is to define a Cypher-compatible Property Graph type based on 
> DataFrames; to replace GraphFrames querying with Cypher; to reimplement 
> GraphX/GraphFrames algos on the PropertyGraph type. 
> To achieve this goal, a core subset of Cypher for Apache Spark (CAPS), 
> reusing existing proven designs and code, will be employed in Spark 3.0. This 
> graph query processor, like CAPS, will overlay and drive the SparkSQL 
> Catalyst query engine, using the CAPS graph query planner.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26774) Document threading concerns in TaskSchedulerImpl

2019-01-29 Thread Imran Rashid (JIRA)
Imran Rashid created SPARK-26774:


 Summary: Document threading concerns in TaskSchedulerImpl
 Key: SPARK-26774
 URL: https://issues.apache.org/jira/browse/SPARK-26774
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler, Spark Core
Affects Versions: 3.0.0
Reporter: Imran Rashid


TaskSchedulerImpl has a bunch of threading concerns, which are not well 
documented -- in fact the docs it has are somewhat misleading.  In particular, 
some of the methods should only be called within the DAGScheduler event loop.

This suggests some potential refactoring to avoid so many mixed concerns inside 
TaskSchedulerImpl, but that's a lot harder to do safely, I just want to add 
some comments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26765) Avro: Validate input and output schema

2019-01-29 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26765.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23684
[https://github.com/apache/spark/pull/23684]

> Avro: Validate input and output schema
> --
>
> Key: SPARK-26765
> URL: https://issues.apache.org/jira/browse/SPARK-26765
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Minor
> Fix For: 3.0.0
>
>
> The API supportDataType in FileFormat helps to validate the output/input 
> schema before exection starts. So that we can avoid some invalid data source 
> IO, and users can see clean error messages.
> This PR is to override the validation API in Avro data source.
> Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), 
> NullType is supported. This PR fixes the handling of NullType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26765) Avro: Validate input and output schema

2019-01-29 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26765:
---

Assignee: Gengliang Wang

> Avro: Validate input and output schema
> --
>
> Key: SPARK-26765
> URL: https://issues.apache.org/jira/browse/SPARK-26765
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Gengliang Wang
>Priority: Minor
>
> The API supportDataType in FileFormat helps to validate the output/input 
> schema before exection starts. So that we can avoid some invalid data source 
> IO, and users can see clean error messages.
> This PR is to override the validation API in Avro data source.
> Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), 
> NullType is supported. This PR fixes the handling of NullType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26752) Multiple aggregate methods in the same column in DataFrame

2019-01-29 Thread Guilherme Beltramini (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755156#comment-16755156
 ] 

Guilherme Beltramini commented on SPARK-26752:
--

Thanks for the input! I also agree with [~mgaido] about deprecating the input 
Map[String, String].

> Multiple aggregate methods in the same column in DataFrame
> --
>
> Key: SPARK-26752
> URL: https://issues.apache.org/jira/browse/SPARK-26752
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Guilherme Beltramini
>Priority: Minor
>
> The agg function in 
> [org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset]
>  accepts as input:
>  * Column*
>  * Map[String, String]
>  * (String, String)*
> I'm proposing to add Map[String, Seq[String]], where the keys are the columns 
> to aggregate, and the values are the functions to apply the aggregation. Here 
> is a similar question: 
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-multiple-agg-on-the-same-column-td29541.html.
> In the example below (running in spark-shell, with Spark 2.4.0), I'm showing 
> a workaround. What I'm proposing is that agg should accept aggMap as input:
> {code:java}
> scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", 
> 20), ("c", 100)).toDF("col1", "col2")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: int]
> scala> df.show
> +++
> |col1|col2|
> +++
> |   a|   1|
> |   a|   2|
> |   a|   3|
> |   a|   4|
> |   b|  10|
> |   b|  20|
> |   c| 100|
> +++
> scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", 
> "mean"))
> aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> 
> List(count), col2 -> List(min, max, mean))
> scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) 
> => Seq(c).zipAll(fns, c, "") }
> aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), 
> (col2,max), (col2,mean))
> scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*)
> dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint 
> ... 3 more fields]
> scala> dfAgg.orderBy("col1").show
> ++---+-+-+-+
> |col1|count(col1)|min(col2)|max(col2)|avg(col2)|
> ++---+-+-+-+
> |   a|  4|1|4|  2.5|
> |   b|  2|   10|   20| 15.0|
> |   c|  1|  100|  100|100.0|
> ++---+-+-+-+
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently

2019-01-29 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated SPARK-26772:
--
Description: 
YARNHadoopDelegationTokenManager now loads ServiceCredentialProviders in one 
steps. The drawback of this if one provider fails all others are not loaded.


> YARNHadoopDelegationTokenManager should load ServiceCredentialProviders 
> independently
> -
>
> Key: SPARK-26772
> URL: https://issues.apache.org/jira/browse/SPARK-26772
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> YARNHadoopDelegationTokenManager now loads ServiceCredentialProviders in one 
> steps. The drawback of this if one provider fails all others are not loaded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26702) Create a test trait for Parquet and Orc test

2019-01-29 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-26702.
---
   Resolution: Fixed
 Assignee: Liang-Chi Hsieh
Fix Version/s: 3.0.0

This is resolved via https://github.com/apache/spark/pull/23628

> Create a test trait for Parquet and Orc test
> 
>
> Key: SPARK-26702
> URL: https://issues.apache.org/jira/browse/SPARK-26702
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 3.0.0
>
>
> For making test suite supporting both Parquet and Orc by reusing test cases, 
> this patch extracts the methods for testing. For example, if we need to test 
> a common feature shared by Parquet and Orc, we should be able to write test 
> cases once and reuse them to test both formats.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26773) Consider alternative base images for Kubernetes

2019-01-29 Thread Ondrej Kokes (JIRA)
Ondrej Kokes created SPARK-26773:


 Summary: Consider alternative base images for Kubernetes
 Key: SPARK-26773
 URL: https://issues.apache.org/jira/browse/SPARK-26773
 Project: Spark
  Issue Type: Improvement
  Components: Kubernetes, PySpark
Affects Versions: 2.4.0
Reporter: Ondrej Kokes


I understand the desire to make the base image (not just) for Kubernetes to be 
minimal and thus the choice of Alpine, but that distro has its limitations. The 
main one being musl as its libc implementation.

The main reason for us not to use Alpine for our non-Spark workloads is that 
we're using Python and *we cannot use pre-built distributions of packages 
(so-called wheels)*, because they are usually built for glibc-based distros 
(work is being done for musl-based builds, but we're not there yet [0]).

So instead of popular packages like numpy or pandas being installed in seconds, 
a build process has to be initiated upon each installation of many packages 
(and that requires a compiler etc.). We could theoretically build all these 
packages into the base image, but that would require multi-step builds, so that 
we don't include gcc/clang in the final image, having to rebuild the docker 
image with each dependency change etc.

There have already been similar issues submitted [1].

*I'm not sure what the best course of action is.* If there should be a e.g. 
debian-based distro as an alternative. Or perhaps there could be a good reason 
for a glibc-based distro to be the default Docker base image, with an option to 
"downgrade" to Alpine. (I'm guessing that R, with its popular Rcpp-based 
extensions, might suffer from a similar problem, but I'm mostly guessing. [2])

 

[0] https://www.python.org/dev/peps/pep-0513/
[1] https://github.com/apache-spark-on-k8s/spark/issues/326
[2] https://github.com/rocker-org/rocker/issues/231#issuecomment-297150217



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26763) Using fileStatus cache when filterPartitions

2019-01-29 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26763.
-
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23683
[https://github.com/apache/spark/pull/23683]

> Using fileStatus cache when filterPartitions
> 
>
> Key: SPARK-26763
> URL: https://issues.apache.org/jira/browse/SPARK-26763
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xianyang Liu
>Assignee: Xianyang Liu
>Priority: Major
> Fix For: 3.0.0
>
>
> We should pass the existed `fileStatusCache` to `InMemoryFileIndex` even 
> though there aren't partition columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-11215) Add multiple columns support to StringIndexer

2019-01-29 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-11215.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 20146
[https://github.com/apache/spark/pull/20146]

> Add multiple columns support to StringIndexer
> -
>
> Key: SPARK-11215
> URL: https://issues.apache.org/jira/browse/SPARK-11215
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Yanbo Liang
>Assignee: Yanbo Liang
>Priority: Major
>  Labels: release-notes
> Fix For: 3.0.0
>
>
> Add multiple columns support to StringIndexer, then users can transform 
> multiple input columns to multiple output columns simultaneously. See 
> discussion SPARK-8418.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default

2019-01-29 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-26771:
--
Docs Text: The RDD and DataFrame .unpersist() method, and Broadcast 
.destroy() method, take an optional 'blocking' argument. The default was 
'false' in all cases except for (Scala) RDDs and their GraphX subclasses. The 
default is now 'false' (non-blocking) in all of these methods.

> Make .unpersist(), .destroy() consistently non-blocking by default
> --
>
> Key: SPARK-26771
> URL: https://issues.apache.org/jira/browse/SPARK-26771
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX, Spark Core
>Affects Versions: 2.4.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>  Labels: release-notes
>
> See https://issues.apache.org/jira/browse/SPARK-26728 and 
> https://github.com/apache/spark/pull/23650 . 
> RDD and DataFrame expose an .unpersist() method with optional "blocking" 
> argument. So does Broadcast.destroy(). This argument is false by default 
> except for the Scala RDD (not Pyspark) implementation and its GraphX 
> subclasses. Most usages of these methods request non-blocking behavior 
> already, and indeed, it's not typical to want to wait for the resources to be 
> freed, except in tests asserting behavior about these methods (where blocking 
> is typically requested).
> This proposes to make the default false across these methods, and adjust 
> callers to only request non-default blocking behavior where important, such 
> as in a few key tests. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11215) Add multiple columns support to StringIndexer

2019-01-29 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-11215:
--
Docs Text: 
When specifying frequencyDesc or frequencyAsc as stringOrderType param in 
StringIndexer, in case of equal frequency, the order of strings was previously 
undefined. Since Spark 3.0, strings with equal frequency are further
sorted lexicographically.
Affects Version/s: 2.4.0
   Labels: release-notes  (was: )

> Add multiple columns support to StringIndexer
> -
>
> Key: SPARK-11215
> URL: https://issues.apache.org/jira/browse/SPARK-11215
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Yanbo Liang
>Assignee: Yanbo Liang
>Priority: Major
>  Labels: release-notes
>
> Add multiple columns support to StringIndexer, then users can transform 
> multiple input columns to multiple output columns simultaneously. See 
> discussion SPARK-8418.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26763) Using fileStatus cache when filterPartitions

2019-01-29 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26763:
---

Assignee: Xianyang Liu

> Using fileStatus cache when filterPartitions
> 
>
> Key: SPARK-26763
> URL: https://issues.apache.org/jira/browse/SPARK-26763
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xianyang Liu
>Assignee: Xianyang Liu
>Priority: Major
>
> We should pass the existed `fileStatusCache` to `InMemoryFileIndex` even 
> though there aren't partition columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24959) Do not invoke the CSV/JSON parser for empty schema

2019-01-29 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755088#comment-16755088
 ] 

Sean Owen commented on SPARK-24959:
---

Looks like this may need to be reverted: 
https://issues.apache.org/jira/browse/SPARK-26745

> Do not invoke the CSV/JSON parser for empty schema
> --
>
> Key: SPARK-24959
> URL: https://issues.apache.org/jira/browse/SPARK-24959
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Maxim Gekk
>Assignee: Maxim Gekk
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently JSON and CSV parsers are called even if required schema is empty. 
> Invoking the parser per each line has some non-zero overhead. The action can 
> be skipped. Such optimization should speed up count(), for example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26771:


Assignee: Apache Spark  (was: Sean Owen)

> Make .unpersist(), .destroy() consistently non-blocking by default
> --
>
> Key: SPARK-26771
> URL: https://issues.apache.org/jira/browse/SPARK-26771
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX, Spark Core
>Affects Versions: 2.4.0
>Reporter: Sean Owen
>Assignee: Apache Spark
>Priority: Major
>  Labels: release-notes
>
> See https://issues.apache.org/jira/browse/SPARK-26728 and 
> https://github.com/apache/spark/pull/23650 . 
> RDD and DataFrame expose an .unpersist() method with optional "blocking" 
> argument. So does Broadcast.destroy(). This argument is false by default 
> except for the Scala RDD (not Pyspark) implementation and its GraphX 
> subclasses. Most usages of these methods request non-blocking behavior 
> already, and indeed, it's not typical to want to wait for the resources to be 
> freed, except in tests asserting behavior about these methods (where blocking 
> is typically requested).
> This proposes to make the default false across these methods, and adjust 
> callers to only request non-default blocking behavior where important, such 
> as in a few key tests. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26772:


Assignee: Apache Spark

> YARNHadoopDelegationTokenManager should load ServiceCredentialProviders 
> independently
> -
>
> Key: SPARK-26772
> URL: https://issues.apache.org/jira/browse/SPARK-26772
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Assignee: Apache Spark
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26772:


Assignee: (was: Apache Spark)

> YARNHadoopDelegationTokenManager should load ServiceCredentialProviders 
> independently
> -
>
> Key: SPARK-26772
> URL: https://issues.apache.org/jira/browse/SPARK-26772
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently

2019-01-29 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated SPARK-26772:
--
Component/s: (was: Spark Core)
 YARN

> YARNHadoopDelegationTokenManager should load ServiceCredentialProviders 
> independently
> -
>
> Key: SPARK-26772
> URL: https://issues.apache.org/jira/browse/SPARK-26772
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26771:


Assignee: Sean Owen  (was: Apache Spark)

> Make .unpersist(), .destroy() consistently non-blocking by default
> --
>
> Key: SPARK-26771
> URL: https://issues.apache.org/jira/browse/SPARK-26771
> Project: Spark
>  Issue Type: Improvement
>  Components: GraphX, Spark Core
>Affects Versions: 2.4.0
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Major
>  Labels: release-notes
>
> See https://issues.apache.org/jira/browse/SPARK-26728 and 
> https://github.com/apache/spark/pull/23650 . 
> RDD and DataFrame expose an .unpersist() method with optional "blocking" 
> argument. So does Broadcast.destroy(). This argument is false by default 
> except for the Scala RDD (not Pyspark) implementation and its GraphX 
> subclasses. Most usages of these methods request non-blocking behavior 
> already, and indeed, it's not typical to want to wait for the resources to be 
> freed, except in tests asserting behavior about these methods (where blocking 
> is typically requested).
> This proposes to make the default false across these methods, and adjust 
> callers to only request non-default blocking behavior where important, such 
> as in a few key tests. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently

2019-01-29 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created SPARK-26772:
-

 Summary: YARNHadoopDelegationTokenManager should load 
ServiceCredentialProviders independently
 Key: SPARK-26772
 URL: https://issues.apache.org/jira/browse/SPARK-26772
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Gabor Somogyi






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26728) Make rdd.unpersist blocking configurable

2019-01-29 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-26728.
---
Resolution: Won't Fix

Closing this in favor of https://issues.apache.org/jira/browse/SPARK-26728

> Make rdd.unpersist blocking configurable
> 
>
> Key: SPARK-26728
> URL: https://issues.apache.org/jira/browse/SPARK-26728
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, rdd.unpersist's blocking argument is set to true by default. 
> However, in actual production cluster(especially large cluster), node lost or 
> network issue can always happen.
> Users always use rdd.unpersist as non-exceptional, so sometimes the blocking 
> unpersist may cause user's job failure, and this happened many times in our 
> cluster.
> {code:java}
> 2018-05-16,13:28:33,489 WARN org.apache.spark.storage.BlockManagerMaster: 
> Failed to remove RDD 15 - Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
> java.io.IOException: Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>   at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> 2018-05-16,13:28:33,489 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
> User class threw exception: java.io.IOException: Failed to send RPC 
> 7571440800577648876 to c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
> java.io.IOException: Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>   at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
>   at 
> 

[jira] [Comment Edited] (SPARK-26728) Make rdd.unpersist blocking configurable

2019-01-29 Thread Sean Owen (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755069#comment-16755069
 ] 

Sean Owen edited comment on SPARK-26728 at 1/29/19 2:45 PM:


Closing this in favor of https://issues.apache.org/jira/browse/SPARK-26771


was (Author: srowen):
Closing this in favor of https://issues.apache.org/jira/browse/SPARK-26728

> Make rdd.unpersist blocking configurable
> 
>
> Key: SPARK-26728
> URL: https://issues.apache.org/jira/browse/SPARK-26728
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, rdd.unpersist's blocking argument is set to true by default. 
> However, in actual production cluster(especially large cluster), node lost or 
> network issue can always happen.
> Users always use rdd.unpersist as non-exceptional, so sometimes the blocking 
> unpersist may cause user's job failure, and this happened many times in our 
> cluster.
> {code:java}
> 2018-05-16,13:28:33,489 WARN org.apache.spark.storage.BlockManagerMaster: 
> Failed to remove RDD 15 - Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
> java.io.IOException: Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>   at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> 2018-05-16,13:28:33,489 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
> User class threw exception: java.io.IOException: Failed to send RPC 
> 7571440800577648876 to c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
> java.io.IOException: Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>   at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
>   

[jira] [Created] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default

2019-01-29 Thread Sean Owen (JIRA)
Sean Owen created SPARK-26771:
-

 Summary: Make .unpersist(), .destroy() consistently non-blocking 
by default
 Key: SPARK-26771
 URL: https://issues.apache.org/jira/browse/SPARK-26771
 Project: Spark
  Issue Type: Improvement
  Components: GraphX, Spark Core
Affects Versions: 2.4.0
Reporter: Sean Owen
Assignee: Sean Owen


See https://issues.apache.org/jira/browse/SPARK-26728 and 
https://github.com/apache/spark/pull/23650 . 

RDD and DataFrame expose an .unpersist() method with optional "blocking" 
argument. So does Broadcast.destroy(). This argument is false by default except 
for the Scala RDD (not Pyspark) implementation and its GraphX subclasses. Most 
usages of these methods request non-blocking behavior already, and indeed, it's 
not typical to want to wait for the resources to be freed, except in tests 
asserting behavior about these methods (where blocking is typically requested).

This proposes to make the default false across these methods, and adjust 
callers to only request non-default blocking behavior where important, such as 
in a few key tests. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26727) CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException

2019-01-29 Thread Bela Kovacs (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755065#comment-16755065
 ] 

Bela Kovacs commented on SPARK-26727:
-

I could reproduce it with databricks, although took some while:

while (true) spark.sql("create or replace view aaa as select * from aa")

org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
AlreadyExistsException(message:Table aaa already exists);
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$withClient$1$$anonfun$apply$1.apply(HiveExternalCatalog.scala:150)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.org$apache$spark$sql$hive$HiveExternalCatalog$$maybeSynchronized(HiveExternalCatalog.scala:104)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$withClient$1.apply(HiveExternalCatalog.scala:139)
 at 
com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:330)
 at 
com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:316)
 at 
com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:23)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:137)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:298)
 at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:99)
 at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:341)
 at 
com.databricks.sql.DatabricksSessionCatalog.createTable(DatabricksSessionCatalog.scala:125)
 at 
org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:176)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:81)
 at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:205)
 at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:205)
 at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423)
 at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:91)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:227)
 at 
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:86)

> CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException
> ---
>
> Key: SPARK-26727
> URL: https://issues.apache.org/jira/browse/SPARK-26727
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Srinivas Yarra
>Priority: Major
>
> We experienced that sometimes the Hive query "CREATE OR REPLACE VIEW  name> AS SELECT  FROM " fails with the following exception:
> {code:java}
> // code placeholder
> org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or 
> view '' already exists in database 'default'; at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:314)
>  at 
> org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:165) 
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>  at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>  at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365) at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364) at 
> org.apache.spark.sql.Dataset.(Dataset.scala:195) at 
> org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80) at 
> org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) ... 49 elided
> {code}
> {code}
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res1: org.apache.spark.sql.DataFrame = []
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy 
> FROM ae_dual") res2: org.apache.spark.sql.DataFrame = [] 
> scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as 

[jira] [Updated] (SPARK-26739) Standardized Join Types for DataFrames

2019-01-29 Thread Skyler Lehan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Skyler Lehan updated SPARK-26739:
-
Description: 
h3. *Q1.* What are you trying to do? Articulate your objectives using 
absolutely no jargon.

Currently, in the join functions on 
[DataFrames|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset],
 the join types are defined via a string parameter called joinType. In order 
for a developer to know which joins are possible, they must look up the API 
call for join. While this works fine, it can cause the developer to make a typo 
resulting in improper joins and/or unexpected errors that aren't evident at 
compile time. The objective of this improvement would be to allow developers to 
use a common definition for join types (by enum or constants) called JoinTypes. 
This would contain the possible joins and remove the possibility of a typo. It 
would also allow Spark to alter the names of the joins in the future without 
impacting end-users.
h3. *Q2.* What problem is this proposal NOT designed to solve?

The problem this solves is extremely narrow, it would not solve anything other 
than providing a common definition for join types.
h3. *Q3.* How is it done today, and what are the limits of current practice?

Currently, developers must join two DataFrames like so:
{code:java}
val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), "left_outer")
{code}
Where they manually type the join type. As stated above, this:
 * Requires developers to manually type in the join
 * Can cause possibility of typos
 * Restricts renaming of join types as its a literal string
 * Does not restrict and/or compile check the join type being used, leading to 
runtime errors

h3. *Q4.* What is new in your approach and why do you think it will be 
successful?

The new approach would use constants or *more preferably an enum*, something 
like this:
{code:java}
val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), 
JoinType.LEFT_OUTER)
{code}
This would provide:
 * In code reference/definitions of the possible join types
 ** This subsequently allows the addition of scaladoc of what each join type 
does and how it operates
 * Removes possibilities of a typo on the join type
 * Provides compile time checking of the join type (only if an enum is used)

To clarify, if JoinType is a constant, it would just fill in the joinType 
string parameter for users. If an enum is used, it would restrict the domain of 
possible join types to whatever is defined in the future JoinType enum. The 
enum is preferred, however it would take longer to implement.
h3. *Q5.* Who cares? If you are successful, what difference will it make?

Developers using Apache Spark will care. This will make the join function 
easier to wield and lead to less runtime errors. It will save time by bringing 
join type validation at compile time. It will also provide in code reference to 
the join types, which saves the developer time of having to look up and 
navigate the multiple join functions to find the possible join types. In 
addition to that, the resulting constants/enum would have documentation on how 
that join type works.
h3. *Q6.* What are the risks?

Users of Apache Spark who currently use strings to define their join types 
could be impacted if an enum is chosen as the common definition. This risk can 
be mitigated by using string constants. The string constants would be the exact 
same string as the string literals used today. For example:
{code:java}
JoinType.INNER = "inner"
{code}
If an enum is still the preferred way of defining the join types, new join 
functions could be added that take in these enums and the join calls that 
contain string parameters for joinType could be deprecated. This would give 
developers a chance to change over to the new join types.
h3. *Q7.* How long will it take?

A few days for a seasoned Spark developer.
h3. *Q8.* What are the mid-term and final "exams" to check for success?

Mid-term exam would be the addition of a common definition of the join types 
and additional join functions that take in the join type enum/constant. The 
final exam would be working tests written to check the functionality of these 
new join functions and the join functions that take a string for joinType would 
be deprecated.
h3. *Appendix A.* Proposed API Changes. Optional section defining APIs changes, 
if any. Backward and forward compatibility must be taken into account.

{color:#FF}*It is heavily recommended that enums, and not string constants 
are used.*{color} String constants are presented as a possible solution but not 
the ideal solution.

*If enums are used (preferred):*

The following join function signatures would be added to the Dataset API:
{code:java}
def join(right: Dataset[_], joinExprs: Column, joinType: JoinType): DataFrame
def join(right: Dataset[_], usingColumns: 

[jira] [Commented] (SPARK-26739) Standardized Join Types for DataFrames

2019-01-29 Thread Skyler Lehan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755056#comment-16755056
 ] 

Skyler Lehan commented on SPARK-26739:
--

While constants are possible, they're not ideal. The reason being, if its an 
enum as opposed to a string constant it can't be compile-time checked to make 
sure its a valid join.

> Standardized Join Types for DataFrames
> --
>
> Key: SPARK-26739
> URL: https://issues.apache.org/jira/browse/SPARK-26739
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Skyler Lehan
>Priority: Minor
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> h3. *Q1.* What are you trying to do? Articulate your objectives using 
> absolutely no jargon.
> Currently, in the join functions on 
> [DataFrames|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset],
>  the join types are defined via a string parameter called joinType. In order 
> for a developer to know which joins are possible, they must look up the API 
> call for join. While this works fine, it can cause the developer to make a 
> typo resulting in improper joins and/or unexpected errors that aren't evident 
> at compile time. The objective of this improvement would be to allow 
> developers to use a common definition for join types (by enum or constants) 
> called JoinTypes. This would contain the possible joins and remove the 
> possibility of a typo. It would also allow Spark to alter the names of the 
> joins in the future without impacting end-users.
> h3. *Q2.* What problem is this proposal NOT designed to solve?
> The problem this solves is extremely narrow, it would not solve anything 
> other than providing a common definition for join types.
> h3. *Q3.* How is it done today, and what are the limits of current practice?
> Currently, developers must join two DataFrames like so:
> {code:java}
> val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), 
> "left_outer")
> {code}
> Where they manually type the join type. As stated above, this:
>  * Requires developers to manually type in the join
>  * Can cause possibility of typos
>  * Restricts renaming of join types as its a literal string
>  * Does not restrict and/or compile check the join type being used, leading 
> to runtime errors
> h3. *Q4.* What is new in your approach and why do you think it will be 
> successful?
> The new approach would use constants, something like this:
> {code:java}
> val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), 
> JoinType.LEFT_OUTER)
> {code}
> This would provide:
>  * In code reference/definitions of the possible join types
>  ** This subsequently allows the addition of scaladoc of what each join type 
> does and how it operates
>  * Removes possibilities of a typo on the join type
>  * Provides compile time checking of the join type (only if an enum is used)
> To clarify, if JoinType is a constant, it would just fill in the joinType 
> string parameter for users. If an enum is used, it would restrict the domain 
> of possible join types to whatever is defined in the future JoinType enum. 
> The enum is preferred, however it would take longer to implement.
> h3. *Q5.* Who cares? If you are successful, what difference will it make?
> Developers using Apache Spark will care. This will make the join function 
> easier to wield and lead to less runtime errors. It will save time by 
> bringing join type validation at compile time. It will also provide in code 
> reference to the join types, which saves the developer time of having to look 
> up and navigate the multiple join functions to find the possible join types. 
> In addition to that, the resulting constants/enum would have documentation on 
> how that join type works.
> h3. *Q6.* What are the risks?
> Users of Apache Spark who currently use strings to define their join types 
> could be impacted if an enum is chosen as the common definition. This risk 
> can be mitigated by using string constants. The string constants would be the 
> exact same string as the string literals used today. For example:
> {code:java}
> JoinType.INNER = "inner"
> {code}
> If an enum is still the preferred way of defining the join types, new join 
> functions could be added that take in these enums and the join calls that 
> contain string parameters for joinType could be deprecated. This would give 
> developers a chance to change over to the new join types.
> h3. *Q7.* How long will it take?
> A few days for a seasoned Spark developer.
> h3. *Q8.* What are the mid-term and final "exams" to check for success?
> Mid-term exam would be the addition of a common definition of the join types 
> and additional join functions that take in the join type enum/constant. The 
> final exam 

[jira] [Updated] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens

2019-01-29 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated SPARK-26766:
--
Priority: Minor  (was: Major)

> Remove the list of filesystems from 
> HadoopDelegationTokenProvider.obtainDelegationTokens
> 
>
> Key: SPARK-26766
> URL: https://issues.apache.org/jira/browse/SPARK-26766
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Minor
>
> This was discussed in previous PR 
> [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26770) Misleading/unhelpful error message when wrapping a null in an Option

2019-01-29 Thread sam (JIRA)
sam created SPARK-26770:
---

 Summary: Misleading/unhelpful error message when wrapping a null 
in an Option
 Key: SPARK-26770
 URL: https://issues.apache.org/jira/browse/SPARK-26770
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.2
Reporter: sam


This

{code}

// Using options to indicate nullable fields
case class Product(productID: Option[Int],
   productName: Option[String])

val productExtract: Dataset[Product] =
spark.createDataset(Seq(
  Product(
productID = Some(6050286),
// user mistake here, should be `None` not `Some(null)`
productName = Some(null)
  )))

productExtract.count()

{code}

will give an error like the one below.  This error is thrown from quite deep 
down, but there should be some handling logic further up to check for nulls and 
to give a more informative error message.  E.g. it could tell the user which 
field is null, it could detect the `Some(null)` error and suggest using `None`.

Whatever the exception it shouldn't be NPE, since this is clearly a user error, 
so should be some kind of user error exception.

{code}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 
1.0 (TID 276, 10.139.64.8, executor 1): java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:194)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:620)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

{code}

I've seen quite a few other people with this error, but I don't think it's for 
the same reason:

https://docs.databricks.com/spark/latest/data-sources/tips/redshift-npe.html
https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/Dt6ilC9Dn54
https://issues.apache.org/jira/browse/SPARK-17195
https://issues.apache.org/jira/browse/SPARK-18859
https://github.com/datastax/spark-cassandra-connector/issues/1062
https://stackoverflow.com/questions/39875711/spark-sql-2-0-nullpointerexception-with-a-valid-postgresql-query



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

2019-01-29 Thread Takeshi Yamamuro (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro resolved SPARK-26708.
--
   Resolution: Fixed
Fix Version/s: 3.0.0
   2.4.1

Resolved by https://github.com/apache/spark/pull/23644

> Incorrect result caused by inconsistency between a SQL cache's cached RDD and 
> its physical plan
> ---
>
> Key: SPARK-26708
> URL: https://issues.apache.org/jira/browse/SPARK-26708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiao Li
>Assignee: Maryann Xue
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.1, 3.0.0
>
>
> When performing non-cascading cache invalidation, {{recache}} is called on 
> the other cache entries which are dependent on the cache being invalidated. 
> It leads to the the physical plans of those cache entries being re-compiled. 
> For those cache entries, if the cache RDD has already been persisted, chances 
> are there will be inconsistency between the data and the new plan. It can 
> cause a correctness issue if the new plan's {{outputPartitioning}} or 
> {{outputOrdering}} is different from the that of the actual data, and 
> meanwhile the cache is used by another query that asks for specific 
> {{outputPartitioning}} or {{outputOrdering}} which happens to match the new 
> plan but not the actual data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26769) partition prunning in inner join

2019-01-29 Thread nhufas (JIRA)
nhufas created SPARK-26769:
--

 Summary: partition prunning in inner join
 Key: SPARK-26769
 URL: https://issues.apache.org/jira/browse/SPARK-26769
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: nhufas


When joining a partitioned parquet table with another table by partition column 
it should prunne partitions from partitioned table based on another table 
values.

example:

tableA parquet table partitioned be part_filter

tableB table with column with partition values

 

tableA is partitioned by part_A,part_B,part_C,part_D

tableB is a single column with 2 rows having part_A and part_B as values.

 

doing 

select * from tableA inner join tableB on tableA.part_filter=tableB.part_filter

should generate a partition prunning on tableA based on tableB values (in this 
case scanning only 2 partitions) but it wll read all 4 partitions from tableA 
only filter the results.

 

note: this kind of approach works on Hive (filtering tableA partitions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26765) Avro: Validate input and output schema

2019-01-29 Thread Gengliang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-26765:
---
Summary: Avro: Validate input and output schema  (was: Implement 
supportDataType API in Avro data source)

> Avro: Validate input and output schema
> --
>
> Key: SPARK-26765
> URL: https://issues.apache.org/jira/browse/SPARK-26765
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26765) Avro: Validate input and output schema

2019-01-29 Thread Gengliang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-26765:
---
Description: 
The API supportDataType in FileFormat helps to validate the output/input schema 
before exection starts. So that we can avoid some invalid data source IO, and 
users can see clean error messages.

This PR is to override the validation API in Avro data source.
Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), 
NullType is supported. This PR fixes the handling of NullType.



> Avro: Validate input and output schema
> --
>
> Key: SPARK-26765
> URL: https://issues.apache.org/jira/browse/SPARK-26765
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Minor
>
> The API supportDataType in FileFormat helps to validate the output/input 
> schema before exection starts. So that we can avoid some invalid data source 
> IO, and users can see clean error messages.
> This PR is to override the validation API in Avro data source.
> Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), 
> NullType is supported. This PR fixes the handling of NullType.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26768:
-
Description: 
Recently, when I was reading some code of `BlockManager.getBlockData`, I found 
that there are useless code that would never reach. The related codes is as 
below:

 
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {

shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
getLocalBytes(blockId) match {
  case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
true)
  case None =>
// If this block manager receives a request for a block that it doesn't 
have then it's
// likely that the master has outdated block statuses for this block. 
Therefore, we send
// an RPC so that this block is marked as being unavailable from this 
block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
  }
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
  logDebug(s"Getting local block $blockId as bytes")
  // As an optimization for map output fetches, if the block is for a shuffle, 
return it
  // without acquiring a lock; the disk store never deletes (recent) items so 
this should work
  if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not 
available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
  
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
  } else {
blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
  }
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the 
method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite 
of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where 
the blockId can never be a `ShuffleBlockId`.

  !Selection_037.jpg!

So I think we should remove these useless code for easy reading.

 

  was:
Recently, when I was reading some code of `BlockManager.getBlockData`, I found 
that there are useless code that would never reach. The related codes is as 
below:

 
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {

shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
getLocalBytes(blockId) match {
  case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
true)
  case None =>
// If this block manager receives a request for a block that it doesn't 
have then it's
// likely that the master has outdated block statuses for this block. 
Therefore, we send
// an RPC so that this block is marked as being unavailable from this 
block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
  }
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
  logDebug(s"Getting local block $blockId as bytes")
  // As an optimization for map output fetches, if the block is for a shuffle, 
return it
  // without acquiring a lock; the disk store never deletes (recent) items so 
this should work
  if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not 
available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
  
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
  } else {
blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
  }
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the 
method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite 
of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where 
the blockId can never be a `ShuffleBlockId`.

 

So I think we should remove these useless code for easy reading.

 


> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was 

[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26768:
-
Attachment: Selection_037.jpg

> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I 
> found that there are useless code that would never reach. The related codes 
> is as below:
>  
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
>   if (blockId.isShuffle) {
> 
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
>   } else {
> getLocalBytes(blockId) match {
>   case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
> true)
>   case None =>
> // If this block manager receives a request for a block that it 
> doesn't have then it's
> // likely that the master has outdated block statuses for this block. 
> Therefore, we send
> // an RPC so that this block is marked as being unavailable from this 
> block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
>   }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
>   logDebug(s"Getting local block $blockId as bytes")
>   // As an optimization for map output fetches, if the block is for a 
> shuffle, return it
>   // without acquiring a lock; the disk store never deletes (recent) items so 
> this should work
>   if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not 
> available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
>   
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
>   } else {
> blockInfoManager.lockForReading(blockId).map { info => 
> doGetLocalBytes(blockId, info) }
>   }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the 
> method calling hierarchy of `BlockManager.getLocalBytes`, the another 
> callsite of the `BlockManager.getLocalBytes` is at 
> `TorrentBroadcast.readBlocks` where the blockId can never be a 
> `ShuffleBlockId`.
>  
> So I think we should remove these useless code for easy reading.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26768:


 Summary: Remove useless code in BlockManager
 Key: SPARK-26768
 URL: https://issues.apache.org/jira/browse/SPARK-26768
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: liupengcheng


Recently, when I was reading some code of `BlockManager.getBlockData`, I found 
that there are useless code that would never reach. The related codes is as 
below:

 
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {

shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
getLocalBytes(blockId) match {
  case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
true)
  case None =>
// If this block manager receives a request for a block that it doesn't 
have then it's
// likely that the master has outdated block statuses for this block. 
Therefore, we send
// an RPC so that this block is marked as being unavailable from this 
block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
  }
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
  logDebug(s"Getting local block $blockId as bytes")
  // As an optimization for map output fetches, if the block is for a shuffle, 
return it
  // without acquiring a lock; the disk store never deletes (recent) items so 
this should work
  if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not 
available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
  
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
  } else {
blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
  }
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the 
method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite 
of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where 
the blockId can never be a `ShuffleBlockId`.

 

So I think we should remove these useless code for easy reading.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754882#comment-16754882
 ] 

Marco Gaido edited comment on SPARK-26767 at 1/29/19 11:13 AM:
---

IIRC there was a similar JIRA reported. Maybe the problem is the same. The JIRA 
is SPARK-25420: please check the comments there. Your case may be the same.


was (Author: mgaido):
IIRC there was a similar JIRA reported. May you please try in a newer version 
(ideally current branch-2.3)? This may have been fixed.

> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Major
>
> To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Marco Gaido (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754882#comment-16754882
 ] 

Marco Gaido commented on SPARK-26767:
-

IIRC there was a similar JIRA reported. May you please try in a newer version 
(ideally current branch-2.3)? This may have been fixed.

> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Major
>
> To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26767:
-
Component/s: (was: Build)
 SQL

> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Major
>
> To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-26767:
-
Priority: Major  (was: Blocker)

> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Major
>
> To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754880#comment-16754880
 ] 

Hyukjin Kwon commented on SPARK-26767:
--

Please avoid to set Critical+ which is usually reserved for committers.

> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Major
>
> To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens

2019-01-29 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754865#comment-16754865
 ] 

Gabor Somogyi commented on SPARK-26766:
---

[~vanzin] I was thinking about your 
[suggestion|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54]
 and here are my findings.
* YarnSparkHadoopUtil.hadoopFSsToAccess covers everything which needed as you 
suggested
* On the other hand it's placed in YARN which makes it inaccessible from core
* Don't think it's good idea to move either the mentioned function or the token 
provider

I see mainly the following ways:
* Token provider asks the manager for file systems: I would prefer this
* Use some sort of init function where parameters can be passed: Not all the 
providers interested in this param (actually only FS)
* Split the token provider to YARN... Mesos... and implement the filesystem 
providing functions there: I think it's overkill

Unless you have better idea I'll go with the first one.


> Remove the list of filesystems from 
> HadoopDelegationTokenProvider.obtainDelegationTokens
> 
>
> Key: SPARK-26766
> URL: https://issues.apache.org/jira/browse/SPARK-26766
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> This was discussed in previous PR 
> [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Jeffrey (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeffrey updated SPARK-26767:

Description: 
o repeat the problem,

(1) create a csv file with records holding same values for a subset of columns 
(e.g. colA, colB, colC).

(2) read the csv file as a spark dataframe and then use dropDuplicates to dedup 
the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))

(3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
'A' and colB='B' and colG='G' and colH='H').show(100,False))

 

=> When (3) is rerun, it gives different number of resulting rows.

  was:Fe


> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Blocker
>
> o repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Jeffrey (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeffrey updated SPARK-26767:

Description: 
To repeat the problem,

(1) create a csv file with records holding same values for a subset of columns 
(e.g. colA, colB, colC).

(2) read the csv file as a spark dataframe and then use dropDuplicates to dedup 
the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))

(3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
'A' and colB='B' and colG='G' and colH='H').show(100,False))

 

=> When (3) is rerun, it gives different number of resulting rows.

  was:
o repeat the problem,

(1) create a csv file with records holding same values for a subset of columns 
(e.g. colA, colB, colC).

(2) read the csv file as a spark dataframe and then use dropDuplicates to dedup 
the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))

(3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
'A' and colB='B' and colG='G' and colH='H').show(100,False))

 

=> When (3) is rerun, it gives different number of resulting rows.


> Filter on a dropDuplicates dataframe gives inconsistency result
> ---
>
> Key: SPARK-26767
> URL: https://issues.apache.org/jira/browse/SPARK-26767
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 2.3.0
> Environment: To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.
>Reporter: Jeffrey
>Priority: Blocker
>
> To repeat the problem,
> (1) create a csv file with records holding same values for a subset of 
> columns (e.g. colA, colB, colC).
> (2) read the csv file as a spark dataframe and then use dropDuplicates to 
> dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))
> (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
> 'A' and colB='B' and colG='G' and colH='H').show(100,False))
>  
> => When (3) is rerun, it gives different number of resulting rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result

2019-01-29 Thread Jeffrey (JIRA)
Jeffrey created SPARK-26767:
---

 Summary: Filter on a dropDuplicates dataframe gives inconsistency 
result
 Key: SPARK-26767
 URL: https://issues.apache.org/jira/browse/SPARK-26767
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 2.3.0
 Environment: To repeat the problem,

(1) create a csv file with records holding same values for a subset of columns 
(e.g. colA, colB, colC).

(2) read the csv file as a spark dataframe and then use dropDuplicates to dedup 
the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"]))

(3) select the resulting dataframe with where clause. (i.e. df.where("colA = 
'A' and colB='B' and colG='G' and colH='H').show(100,False))

 

=> When (3) is rerun, it gives different number of resulting rows.
Reporter: Jeffrey


Fe



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens

2019-01-29 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created SPARK-26766:
-

 Summary: Remove the list of filesystems from 
HadoopDelegationTokenProvider.obtainDelegationTokens
 Key: SPARK-26766
 URL: https://issues.apache.org/jira/browse/SPARK-26766
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Gabor Somogyi


This was discussed in previous PR 
[here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26765) Implement supportDataType API in Avro data source

2019-01-29 Thread Gengliang Wang (JIRA)
Gengliang Wang created SPARK-26765:
--

 Summary: Implement supportDataType API in Avro data source
 Key: SPARK-26765
 URL: https://issues.apache.org/jira/browse/SPARK-26765
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.0.0
Reporter: Gengliang Wang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26765) Implement supportDataType API in Avro data source

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26765:


Assignee: (was: Apache Spark)

> Implement supportDataType API in Avro data source
> -
>
> Key: SPARK-26765
> URL: https://issues.apache.org/jira/browse/SPARK-26765
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26765) Implement supportDataType API in Avro data source

2019-01-29 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26765:


Assignee: Apache Spark

> Implement supportDataType API in Avro data source
> -
>
> Key: SPARK-26765
> URL: https://issues.apache.org/jira/browse/SPARK-26765
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Gengliang Wang
>Assignee: Apache Spark
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26764) [SPIP] Spark Relational Cache

2019-01-29 Thread Adrian Wang (JIRA)
Adrian Wang created SPARK-26764:
---

 Summary: [SPIP] Spark Relational Cache
 Key: SPARK-26764
 URL: https://issues.apache.org/jira/browse/SPARK-26764
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 2.4.0
Reporter: Adrian Wang
 Attachments: Relational+Cache+SPIP.pdf

In modern database systems, relational cache is a common technology to boost 
ad-hoc queries. While Spark provides cache natively, Spark SQL should be able 
to utilize the relationship between relations to boost all possible queries. In 
this SPIP, we will make Spark be able to utilize all defined cached relations 
if possible, without explicit substitution in user query, as well as keep some 
user defined cache available in different sessions. Materialized views in many 
database systems provide similar function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26764) [SPIP] Spark Relational Cache

2019-01-29 Thread Adrian Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Wang updated SPARK-26764:

Attachment: Relational+Cache+SPIP.pdf

> [SPIP] Spark Relational Cache
> -
>
> Key: SPARK-26764
> URL: https://issues.apache.org/jira/browse/SPARK-26764
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Adrian Wang
>Priority: Major
> Attachments: Relational+Cache+SPIP.pdf
>
>
> In modern database systems, relational cache is a common technology to boost 
> ad-hoc queries. While Spark provides cache natively, Spark SQL should be able 
> to utilize the relationship between relations to boost all possible queries. 
> In this SPIP, we will make Spark be able to utilize all defined cached 
> relations if possible, without explicit substitution in user query, as well 
> as keep some user defined cache available in different sessions. Materialized 
> views in many database systems provide similar function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26752) Multiple aggregate methods in the same column in DataFrame

2019-01-29 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26752.
--
Resolution: Won't Fix

> Multiple aggregate methods in the same column in DataFrame
> --
>
> Key: SPARK-26752
> URL: https://issues.apache.org/jira/browse/SPARK-26752
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Guilherme Beltramini
>Priority: Minor
>
> The agg function in 
> [org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset]
>  accepts as input:
>  * Column*
>  * Map[String, String]
>  * (String, String)*
> I'm proposing to add Map[String, Seq[String]], where the keys are the columns 
> to aggregate, and the values are the functions to apply the aggregation. Here 
> is a similar question: 
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-multiple-agg-on-the-same-column-td29541.html.
> In the example below (running in spark-shell, with Spark 2.4.0), I'm showing 
> a workaround. What I'm proposing is that agg should accept aggMap as input:
> {code:java}
> scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", 
> 20), ("c", 100)).toDF("col1", "col2")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: int]
> scala> df.show
> +++
> |col1|col2|
> +++
> |   a|   1|
> |   a|   2|
> |   a|   3|
> |   a|   4|
> |   b|  10|
> |   b|  20|
> |   c| 100|
> +++
> scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", 
> "mean"))
> aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> 
> List(count), col2 -> List(min, max, mean))
> scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) 
> => Seq(c).zipAll(fns, c, "") }
> aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), 
> (col2,max), (col2,mean))
> scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*)
> dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint 
> ... 3 more fields]
> scala> dfAgg.orderBy("col1").show
> ++---+-+-+-+
> |col1|count(col1)|min(col2)|max(col2)|avg(col2)|
> ++---+-+-+-+
> |   a|  4|1|4|  2.5|
> |   b|  2|   10|   20| 15.0|
> |   c|  1|  100|  100|100.0|
> ++---+-+-+-+
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26760) [Spark Incorrect display in SPARK UI Executor Tab when number of cores is 4 and Active Task display as 5 in Executor Tab of SPARK UI]

2019-01-29 Thread ABHISHEK KUMAR GUPTA (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ABHISHEK KUMAR GUPTA updated SPARK-26760:
-
Summary: [Spark Incorrect display in SPARK UI Executor Tab when number of 
cores is 4 and Active Task display as 5 in Executor Tab of SPARK UI]  (was: 
[Spark Incorrect display in YARN UI Executor Tab when number of cores is 4 and 
Active Task display as 5 in Executor Tab of YARN UI])

> [Spark Incorrect display in SPARK UI Executor Tab when number of cores is 4 
> and Active Task display as 5 in Executor Tab of SPARK UI]
> -
>
> Key: SPARK-26760
> URL: https://issues.apache.org/jira/browse/SPARK-26760
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.0
> Environment: Spark 2.4
>Reporter: ABHISHEK KUMAR GUPTA
>Priority: Major
> Attachments: SPARK-26760.png
>
>
> Steps:
>  # Launch Spark Shell 
>  # bin/spark-shell --master yarn  --conf spark.dynamicAllocation.enabled=true 
> --conf spark.dynamicAllocation.initialExecutors=3 --conf 
> spark.dynamicAllocation.minExecutors=1 --conf 
> spark.dynamicAllocation.executorIdleTimeout=60s --conf 
> spark.dynamicAllocation.maxExecutors=5
>  # Submit a Job sc.parallelize(1 to 1,116000).count()
>  # Check the YARN UI Executor Tab for the RUNNING application
>  # UI display as Number of cores 4 and Active Tasks column shows as 5
> Expected:
> It Number of Active Tasks should be same as Number of Cores.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >