[jira] [Assigned] (SPARK-26780) Improve shuffle read using ReadAheadInputStream
[ https://issues.apache.org/jira/browse/SPARK-26780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26780: Assignee: Apache Spark > Improve shuffle read using ReadAheadInputStream > - > > Key: SPARK-26780 > URL: https://issues.apache.org/jira/browse/SPARK-26780 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: liuxian >Assignee: Apache Spark >Priority: Major > > Using _ReadAheadInputStream_ to improve shuffle read performance. > _ReadAheadInputStream_ can save cpu utilization and almost no performance > regression -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26780) Improve shuffle read using ReadAheadInputStream
[ https://issues.apache.org/jira/browse/SPARK-26780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26780: Assignee: (was: Apache Spark) > Improve shuffle read using ReadAheadInputStream > - > > Key: SPARK-26780 > URL: https://issues.apache.org/jira/browse/SPARK-26780 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: liuxian >Priority: Major > > Using _ReadAheadInputStream_ to improve shuffle read performance. > _ReadAheadInputStream_ can save cpu utilization and almost no performance > regression -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26780) Improve shuffle read using ReadAheadInputStream
liuxian created SPARK-26780: --- Summary: Improve shuffle read using ReadAheadInputStream Key: SPARK-26780 URL: https://issues.apache.org/jira/browse/SPARK-26780 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.0.0 Reporter: liuxian Using _ReadAheadInputStream_ to improve shuffle read performance. _ReadAheadInputStream_ can save cpu utilization and almost no performance regression -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24360) Support Hive 3.1 metastore
[ https://issues.apache.org/jira/browse/SPARK-24360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755770#comment-16755770 ] Dongjoon Hyun commented on SPARK-24360: --- I made a new PR for HMS 3.1. > Support Hive 3.1 metastore > -- > > Key: SPARK-24360 > URL: https://issues.apache.org/jira/browse/SPARK-24360 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Dongjoon Hyun >Priority: Major > > Hive 3.1.0 is released. This issue aims to support Hive Metastore 3.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762 ] huanghuai edited comment on SPARK-25420 at 1/30/19 7:13 AM: --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[] { new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), } ); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("age"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- was (Author: zhiyin1233): --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[] { new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), } ); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("age"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > >
[jira] [Resolved] (SPARK-26378) Queries of wide CSV/JSON data slowed after SPARK-26151
[ https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26378. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23336 [https://github.com/apache/spark/pull/23336] > Queries of wide CSV/JSON data slowed after SPARK-26151 > -- > > Key: SPARK-26378 > URL: https://issues.apache.org/jira/browse/SPARK-26378 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Assignee: Bruce Robbins >Priority: Major > Fix For: 3.0.0 > > > A recent change significantly slowed the queries of wide CSV tables. For > example, queries against a 6000 column table slowed by 45-48% when queried > with a single executor. > > The [PR for > SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2] > changed FailureSafeParser#toResultRow such that the returned function > recreates every row, even when the associated input record has no parsing > issues and the user specified no corrupt record field in his/her schema. This > extra processing is responsible for the slowdown. > The change to FailureSafeParser also impacted queries of wide JSON tables as > well. > I propose that a row should be recreated only if there is a parsing error or > columns need to be shifted due to the existence of a corrupt column field in > the user-supplied schema. Otherwise, the row should be used as-is. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26378) Queries of wide CSV/JSON data slowed after SPARK-26151
[ https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26378: Assignee: Bruce Robbins > Queries of wide CSV/JSON data slowed after SPARK-26151 > -- > > Key: SPARK-26378 > URL: https://issues.apache.org/jira/browse/SPARK-26378 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Assignee: Bruce Robbins >Priority: Major > > A recent change significantly slowed the queries of wide CSV tables. For > example, queries against a 6000 column table slowed by 45-48% when queried > with a single executor. > > The [PR for > SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2] > changed FailureSafeParser#toResultRow such that the returned function > recreates every row, even when the associated input record has no parsing > issues and the user specified no corrupt record field in his/her schema. This > extra processing is responsible for the slowdown. > The change to FailureSafeParser also impacted queries of wide JSON tables as > well. > I propose that a row should be recreated only if there is a parsing error or > columns need to be shifted due to the existence of a corrupt column field in > the user-supplied schema. Otherwise, the row should be used as-is. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762 ] huanghuai edited comment on SPARK-25420 at 1/30/19 7:11 AM: --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[] { new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), } ); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("age"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- was (Author: zhiyin1233): --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[]{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), }); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("dropDuplicates"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > >
[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762 ] huanghuai edited comment on SPARK-25420 at 1/30/19 7:06 AM: --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[]{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), }); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("dropDuplicates"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- was (Author: zhiyin1233): --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[]{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), }); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("dropDuplicates"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > >
[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755762#comment-16755762 ] huanghuai commented on SPARK-25420: --- --- the code to be reproduced -- SparkSession spark = SparkSession.builder().appName("test").master("local[2]").master("local").getOrCreate(); spark.conf().set("spark.sql.shuffle.partitions", 3); spark.conf().set("spark.default.parallelism", 3); List data1 = Arrays.asList( RowFactory.create("jack", 22, "gz"), RowFactory.create("lisi", 22, "sh"), RowFactory.create("wangwu", 22, "tj"), RowFactory.create("leo", 21, "alibaba"), RowFactory.create("a", 22, "sz"), RowFactory.create("b", 22, "sz1"), RowFactory.create("c", 22, "sz2"), RowFactory.create("d", 22, "sz3"), RowFactory.create("mic", 22, "bc") ); StructType schema1 = new StructType( new StructField[]{ new StructField("name", DataTypes.StringType, true, Metadata.empty()), new StructField("age", DataTypes.IntegerType, true, Metadata.empty()), new StructField("address", DataTypes.StringType, true, Metadata.empty()), }); for (int i = 0; i < 5; i++) { Dataset dropDuplicates = spark.createDataFrame(data1, schema1) .repartition(2) // when you set repartition is 1,2,3 everytime you can see the difference in show method. .dropDuplicates("dropDuplicates"); dropDuplicates.show(); dropDuplicates = dropDuplicates.filter("address like 'sz%'"); System.out.println("count=" + dropDuplicates.count()); //I find another problem: filter("address='sz2').show() ,the result have two record // but if I use:filter("address='sz2').count() , the count is 0 } --- It is not a bug it self, just a use problem, or else.-- > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > > --The above is code > --- > > > console output: > source count=459275 > dropDuplicates count1=453987 > dropDuplicates count2=453987 > filter count1=445798 > filter count2=445797 > filter count3=445797 > filter count4=445798 > filter count5=445799 > > question: > > Why is filter.count() different everytime? > if I remove dropDuplicates() everything will be ok!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26768) Remove useless code in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26768: Assignee: Apache Spark > Remove useless code in BlockManager > --- > > Key: SPARK-26768 > URL: https://issues.apache.org/jira/browse/SPARK-26768 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: liupengcheng >Assignee: Apache Spark >Priority: Major > Attachments: Selection_037.jpg > > > Recently, when I was reading some code of `BlockManager.getBlockData`, I > found that there are useless code that would never reach. The related codes > is as below: > > {code:java} > override def getBlockData(blockId: BlockId): ManagedBuffer = { > if (blockId.isShuffle) { > > shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) > } else { > getLocalBytes(blockId) match { > case Some(blockData) => > new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, > true) > case None => > // If this block manager receives a request for a block that it > doesn't have then it's > // likely that the master has outdated block statuses for this block. > Therefore, we send > // an RPC so that this block is marked as being unavailable from this > block manager. > reportBlockStatus(blockId, BlockStatus.empty) > throw new BlockNotFoundException(blockId.toString) > } > } > } > {code} > {code:java} > def getLocalBytes(blockId: BlockId): Option[BlockData] = { > logDebug(s"Getting local block $blockId as bytes") > // As an optimization for map output fetches, if the block is for a > shuffle, return it > // without acquiring a lock; the disk store never deletes (recent) items so > this should work > if (blockId.isShuffle) { > val shuffleBlockResolver = shuffleManager.shuffleBlockResolver > // TODO: This should gracefully handle case where local block is not > available. Currently > // downstream code will throw an exception. > val buf = new ChunkedByteBuffer( > > shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) > Some(new ByteBufferBlockData(buf, true)) > } else { > blockInfoManager.lockForReading(blockId).map { info => > doGetLocalBytes(blockId, info) } > } > } > {code} > the `blockId.isShuffle` is checked twice, but however it seems that in the > method calling hierarchy of `BlockManager.getLocalBytes`, the another > callsite of the `BlockManager.getLocalBytes` is at > `TorrentBroadcast.readBlocks` where the blockId can never be a > `ShuffleBlockId`. > !Selection_037.jpg! > So I think we should remove these useless code for easy reading. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26768) Remove useless code in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26768: Assignee: (was: Apache Spark) > Remove useless code in BlockManager > --- > > Key: SPARK-26768 > URL: https://issues.apache.org/jira/browse/SPARK-26768 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: liupengcheng >Priority: Major > Attachments: Selection_037.jpg > > > Recently, when I was reading some code of `BlockManager.getBlockData`, I > found that there are useless code that would never reach. The related codes > is as below: > > {code:java} > override def getBlockData(blockId: BlockId): ManagedBuffer = { > if (blockId.isShuffle) { > > shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) > } else { > getLocalBytes(blockId) match { > case Some(blockData) => > new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, > true) > case None => > // If this block manager receives a request for a block that it > doesn't have then it's > // likely that the master has outdated block statuses for this block. > Therefore, we send > // an RPC so that this block is marked as being unavailable from this > block manager. > reportBlockStatus(blockId, BlockStatus.empty) > throw new BlockNotFoundException(blockId.toString) > } > } > } > {code} > {code:java} > def getLocalBytes(blockId: BlockId): Option[BlockData] = { > logDebug(s"Getting local block $blockId as bytes") > // As an optimization for map output fetches, if the block is for a > shuffle, return it > // without acquiring a lock; the disk store never deletes (recent) items so > this should work > if (blockId.isShuffle) { > val shuffleBlockResolver = shuffleManager.shuffleBlockResolver > // TODO: This should gracefully handle case where local block is not > available. Currently > // downstream code will throw an exception. > val buf = new ChunkedByteBuffer( > > shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) > Some(new ByteBufferBlockData(buf, true)) > } else { > blockInfoManager.lockForReading(blockId).map { info => > doGetLocalBytes(blockId, info) } > } > } > {code} > the `blockId.isShuffle` is checked twice, but however it seems that in the > method calling hierarchy of `BlockManager.getLocalBytes`, the another > callsite of the `BlockManager.getLocalBytes` is at > `TorrentBroadcast.readBlocks` where the blockId can never be a > `ShuffleBlockId`. > !Selection_037.jpg! > So I think we should remove these useless code for easy reading. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26749) spark streaming kafka verison for high version
[ https://issues.apache.org/jira/browse/SPARK-26749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755758#comment-16755758 ] Hyukjin Kwon commented on SPARK-26749: -- https://spark.apache.org/community.html > spark streaming kafka verison for high version > --- > > Key: SPARK-26749 > URL: https://issues.apache.org/jira/browse/SPARK-26749 > Project: Spark > Issue Type: Question > Components: Java API >Affects Versions: 2.2.0 >Reporter: Chang Quanyou >Priority: Major > > when I use `spark-streaming-Kafka-0-10_2.11` to consume Kafka Topic, the > inner version is : > kafka-clients:0.10.0.1, it means that the inner Kafka version is 0.1.0.0.1 ; > before our Kafka cluster is 0.10.1, But now we will update the Kafka cluster > to 1.0, but the inner dependency is also 0.10.1 ; and read the document > which said support 0.10.0 or higher version, It means that the Kafka client > does not match Kafka cluster version, I tried, No issue, But I think it's not > safe, what could I do ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26699) Dataset column output discrepancies
[ https://issues.apache.org/jira/browse/SPARK-26699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755756#comment-16755756 ] Hyukjin Kwon commented on SPARK-26699: -- Please take a look at https://spark.apache.org/community.html > Dataset column output discrepancies > > > Key: SPARK-26699 > URL: https://issues.apache.org/jira/browse/SPARK-26699 > Project: Spark > Issue Type: Question > Components: Input/Output >Affects Versions: 2.3.2 >Reporter: Praveena >Priority: Major > > Hi, > > When i run my job in Local mode (meaning as standalone in Eclipse) with same > parquet input files, the output is - > > locations > > [[[true, [[, phys... > [[[true, [[, phys... > [[[true, [[, phys... > null > [[[true, [[, phys... > [[[true, [[, phys... > [[[true, [[, phys... > [[[true, [[, phys... > [[[true, [[, phys... > [[[true, [[, phys... > > But when i run the same code base with same input parquet files in the YARN > cluster mode, my output is as below - > > locations > > [*WrappedArray*([tr... > [*WrappedArray*([tr... > [WrappedArray([tr... > null > [WrappedArray([tr... > [WrappedArray([tr... > [WrappedArray([tr... > [WrappedArray([tr... > [WrappedArray([tr... > [WrappedArray([tr... > Its appending WrappedArray :( > I am using Apache Spark 2.3.2 version and the EMR Version is 5.19.0. What > could be the reason for discrepancies in the output of certain Table columns ? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755757#comment-16755757 ] Jungtaek Lim commented on SPARK-25420: -- [~jeffrey.mak] Hmm... the result looks odd but also not easy to investigate unless we can narrow down smaller dataset of reproducer. [~mgaido] WDYT about Jeffrey's case? > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > > --The above is code > --- > > > console output: > source count=459275 > dropDuplicates count1=453987 > dropDuplicates count2=453987 > filter count1=445798 > filter count2=445797 > filter count3=445797 > filter count4=445798 > filter count5=445799 > > question: > > Why is filter.count() different everytime? > if I remove dropDuplicates() everything will be ok!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26777) SQL worked in 2.3.2 and fails in 2.4.0
[ https://issues.apache.org/jira/browse/SPARK-26777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755749#comment-16755749 ] Hyukjin Kwon commented on SPARK-26777: -- Please narrow down the problem, and describe your analysis. self-contained reproducer if possible. It sounds like just requesting investigations. Please reopen after filling sufficient narrowed-down analysis, and self-contained reproducer. > SQL worked in 2.3.2 and fails in 2.4.0 > -- > > Key: SPARK-26777 > URL: https://issues.apache.org/jira/browse/SPARK-26777 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Yuri Budilov >Priority: Major > > Following SQL worked in Spark 2.3.2 and now fails on 2.4.0 (AWS EMR Spark) > PySpark call below: > spark.sql("select partition_year_utc,partition_month_utc,partition_day_utc \ > from datalake_reporting.copy_of_leads_notification \ > where partition_year_utc = (select max(partition_year_utc) from > datalake_reporting.copy_of_leads_notification) \ > and partition_month_utc = \ > (select max(partition_month_utc) from > datalake_reporting.copy_of_leads_notification as m \ > where \ > m.partition_year_utc = (select max(partition_year_utc) from > datalake_reporting.copy_of_leads_notification)) \ > and partition_day_utc = (select max(d.partition_day_utc) from > datalake_reporting.copy_of_leads_notification as d \ > where d.partition_month_utc = \ > (select max(m1.partition_month_utc) from > datalake_reporting.copy_of_leads_notification as m1 \ > where m1.partition_year_utc = \ > (select max(y.partition_year_utc) from > datalake_reporting.copy_of_leads_notification as y) \ > ) \ > ) \ > order by 1 desc, 2 desc, 3 desc limit 1 ").show(1,False) > Error: (no need for data, this is syntax). > py4j.protocol.Py4JJavaError: An error occurred while calling o1326.showString. > : java.lang.UnsupportedOperationException: Cannot evaluate expression: > scalar-subquery#4495 [] > > Note: all 3 columns in query are Partitioned columns - see bottom of the > schema) > > Hive EMR AWS Schema is: > > CREATE EXTERNAL TABLE `copy_of_leads_notification`( > `message.environment.siteorigin` string, `dcpheader.dcploaddateutc` string, > `message.id` int, `source.properties._country` string, `message.created` > string, `dcpheader.generatedmessageid` string, `message.tags` bigint, > `source.properties._enqueuedtimeutc` string, `source.properties._leadtype` > string, `message.itemid` string, `message.prospect.postcode` string, > `message.prospect.email` string, `message.referenceid` string, > `message.item.year` string, `message.identifier` string, > `dcpheader.dcploadmonthutc` string, `message.processed` string, > `source.properties._tenant` string, `message.item.price` string, > `message.subscription.confirmresponse` boolean, `message.itemtype` string, > `message.prospect.lastname` string, `message.subscription.insurancequote` > boolean, `source.exchangename` string, > `message.prospect.identificationnumbers` bigint, > `message.environment.ipaddress` string, `dcpheader.dcploaddayutc` string, > `source.properties._itemtype` string, `source.properties._requesttype` > string, `message.item.make` string, `message.prospect.firstname` string, > `message.subscription.survey` boolean, `message.prospect.homephone` string, > `message.extendedproperties` bigint, `message.subscription.financequote` > boolean, `message.uniqueidentifier` string, `source.properties._id` string, > `dcpheader.sourcemessageguid` string, `message.requesttype` string, > `source.routingkey` string, `message.service` string, `message.item.model` > string, `message.environment.pagesource` string, `source.source` string, > `message.sellerid` string, `partition_date_utc` string, > `message.selleridentifier` string, `message.subscription.newsletter` boolean, > `dcpheader.dcploadyearutc` string, `message.leadtype` string, > `message.history` bigint, `message.callconnect.calloutcome` string, > `message.callconnect.datecreatedutc` string, > `message.callconnect.callrecordingurl` string, > `message.callconnect.transferoutcome` string, > `message.callconnect.hiderecording` boolean, > `message.callconnect.callstartutc` string, `message.callconnect.code` string, > `message.callconnect.callduration` string, `message.fraudnetinfo` string, > `message.callconnect.answernumber` string, `message.environment.sourcedevice` > string, `message.comments` string, `message.fraudinfo.servervariables` > bigint, `message.callconnect.servicenumber` string, > `message.callconnect.callid` string, `message.callconnect.voicemailurl` > string, `message.item.stocknumber` string, > `message.callconnect.answerduration` string, `message.callconnect.callendutc` > string, `message.item.series`
[jira] [Resolved] (SPARK-26777) SQL worked in 2.3.2 and fails in 2.4.0
[ https://issues.apache.org/jira/browse/SPARK-26777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26777. -- Resolution: Incomplete > SQL worked in 2.3.2 and fails in 2.4.0 > -- > > Key: SPARK-26777 > URL: https://issues.apache.org/jira/browse/SPARK-26777 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Yuri Budilov >Priority: Major > > Following SQL worked in Spark 2.3.2 and now fails on 2.4.0 (AWS EMR Spark) > PySpark call below: > spark.sql("select partition_year_utc,partition_month_utc,partition_day_utc \ > from datalake_reporting.copy_of_leads_notification \ > where partition_year_utc = (select max(partition_year_utc) from > datalake_reporting.copy_of_leads_notification) \ > and partition_month_utc = \ > (select max(partition_month_utc) from > datalake_reporting.copy_of_leads_notification as m \ > where \ > m.partition_year_utc = (select max(partition_year_utc) from > datalake_reporting.copy_of_leads_notification)) \ > and partition_day_utc = (select max(d.partition_day_utc) from > datalake_reporting.copy_of_leads_notification as d \ > where d.partition_month_utc = \ > (select max(m1.partition_month_utc) from > datalake_reporting.copy_of_leads_notification as m1 \ > where m1.partition_year_utc = \ > (select max(y.partition_year_utc) from > datalake_reporting.copy_of_leads_notification as y) \ > ) \ > ) \ > order by 1 desc, 2 desc, 3 desc limit 1 ").show(1,False) > Error: (no need for data, this is syntax). > py4j.protocol.Py4JJavaError: An error occurred while calling o1326.showString. > : java.lang.UnsupportedOperationException: Cannot evaluate expression: > scalar-subquery#4495 [] > > Note: all 3 columns in query are Partitioned columns - see bottom of the > schema) > > Hive EMR AWS Schema is: > > CREATE EXTERNAL TABLE `copy_of_leads_notification`( > `message.environment.siteorigin` string, `dcpheader.dcploaddateutc` string, > `message.id` int, `source.properties._country` string, `message.created` > string, `dcpheader.generatedmessageid` string, `message.tags` bigint, > `source.properties._enqueuedtimeutc` string, `source.properties._leadtype` > string, `message.itemid` string, `message.prospect.postcode` string, > `message.prospect.email` string, `message.referenceid` string, > `message.item.year` string, `message.identifier` string, > `dcpheader.dcploadmonthutc` string, `message.processed` string, > `source.properties._tenant` string, `message.item.price` string, > `message.subscription.confirmresponse` boolean, `message.itemtype` string, > `message.prospect.lastname` string, `message.subscription.insurancequote` > boolean, `source.exchangename` string, > `message.prospect.identificationnumbers` bigint, > `message.environment.ipaddress` string, `dcpheader.dcploaddayutc` string, > `source.properties._itemtype` string, `source.properties._requesttype` > string, `message.item.make` string, `message.prospect.firstname` string, > `message.subscription.survey` boolean, `message.prospect.homephone` string, > `message.extendedproperties` bigint, `message.subscription.financequote` > boolean, `message.uniqueidentifier` string, `source.properties._id` string, > `dcpheader.sourcemessageguid` string, `message.requesttype` string, > `source.routingkey` string, `message.service` string, `message.item.model` > string, `message.environment.pagesource` string, `source.source` string, > `message.sellerid` string, `partition_date_utc` string, > `message.selleridentifier` string, `message.subscription.newsletter` boolean, > `dcpheader.dcploadyearutc` string, `message.leadtype` string, > `message.history` bigint, `message.callconnect.calloutcome` string, > `message.callconnect.datecreatedutc` string, > `message.callconnect.callrecordingurl` string, > `message.callconnect.transferoutcome` string, > `message.callconnect.hiderecording` boolean, > `message.callconnect.callstartutc` string, `message.callconnect.code` string, > `message.callconnect.callduration` string, `message.fraudnetinfo` string, > `message.callconnect.answernumber` string, `message.environment.sourcedevice` > string, `message.comments` string, `message.fraudinfo.servervariables` > bigint, `message.callconnect.servicenumber` string, > `message.callconnect.callid` string, `message.callconnect.voicemailurl` > string, `message.item.stocknumber` string, > `message.callconnect.answerduration` string, `message.callconnect.callendutc` > string, `message.item.series` string, `message.item.detailsurl` string, > `message.item.pricetype` string, `message.item.description` string, > `message.item.colour` string, `message.item.badge` string, > `message.item.odometer` string, `message.environment.requestheader` string, >
[jira] [Resolved] (SPARK-26779) NullPointerException when disable wholestage codegen
[ https://issues.apache.org/jira/browse/SPARK-26779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26779. -- Resolution: Incomplete Please reopen this after filling sufficient details requested > NullPointerException when disable wholestage codegen > > > Key: SPARK-26779 > URL: https://issues.apache.org/jira/browse/SPARK-26779 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Trivial > > When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9: > java.lang.NullPointerException at > org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170) > at > org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613) > at > org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at >
[jira] [Commented] (SPARK-26779) NullPointerException when disable wholestage codegen
[ https://issues.apache.org/jira/browse/SPARK-26779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755746#comment-16755746 ] Hyukjin Kwon commented on SPARK-26779: -- Please just don't copy and paste. Describe your analysis, if it is persistent or not, how you reproduce this (step by step). > NullPointerException when disable wholestage codegen > > > Key: SPARK-26779 > URL: https://issues.apache.org/jira/browse/SPARK-26779 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Trivial > > When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9: > java.lang.NullPointerException at > org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170) > at > org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613) > at > org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.immutable.List.map(List.scala:285) at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) > at >
[jira] [Created] (SPARK-26779) NullPointerException when disable wholestage codegen
Xiaoju Wu created SPARK-26779: - Summary: NullPointerException when disable wholestage codegen Key: SPARK-26779 URL: https://issues.apache.org/jira/browse/SPARK-26779 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Xiaoju Wu When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9: java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at
[jira] [Created] (SPARK-26778) Remove rule `FallbackOrcDataSourceV2` when catalog support of file data source v2 is finished
Gengliang Wang created SPARK-26778: -- Summary: Remove rule `FallbackOrcDataSourceV2` when catalog support of file data source v2 is finished Key: SPARK-26778 URL: https://issues.apache.org/jira/browse/SPARK-26778 Project: Spark Issue Type: Task Components: SQL Affects Versions: 3.0.0 Reporter: Gengliang Wang -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755709#comment-16755709 ] Jeffrey edited comment on SPARK-25420 at 1/30/19 5:59 AM: -- [~kabhwan] I cannot share the dataset since it is owned by my clients. I could elaborate more on the scenarios: >>drkcard_0_df = >>spark.read.csv("""[wasbs://e...@okprodstorage.blob.core.windows.net/AAA/WICTW/raw/TXN/POS/2018/09/**/*.gz]""") The dataset carries > 100,000 of records. >>drkcard_0_df.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and >>_c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|Tom| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|James| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence| >>dropDup_0=drkcard_0_df.dropDuplicates(["_c0","_c1","_c2","_c3","_c4"]) Running the where for 1st time: {color:#ff}Bug: Group C was mistakenly dropped.{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and >>_c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary| Running the where again: {color:#ff}Acceptable result{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence| Running the where again:{color:#ff}Bug: Group A and C were mistakenly dropped.{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| Continue to repeat running the where statement, the number of rows keep changing. was (Author: jeffrey.mak): [~kabhwan] I cannot share the dataset since it is owned by my clients. I could elaborate more on the scenarios: >>drkcard_0_df = >>spark.read.csv("""[wasbs://e...@okprodstorage.blob.core.windows.net/AAA/WICTW/raw/TXN/POS/2018/09/**/*.gz|wasbs://l...@aswprodeastorage.blob.core.windows.net/ASW/WTCTW/raw/TlogParser/PARSED_TLOG/2018/09/**/*.gz]""") The dataset carries > 100,000 of records. >>drkcard_0_df.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and >>_c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|Tom| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|James| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence| >>dropDup_0=drkcard_0_df.dropDuplicates(["_c0","_c1","_c2","_c3","_c4"]) Running the where for 1st time: {color:#FF}Bug: Group C was mistakenly dropped.{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' >>and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary| Running the where again: {color:#FF}Acceptable result{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence| Running the where again:{color:#FF}Bug: Group A and C were mistakenly dropped.{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| Continue to repeat running the where statement, the number of rows keep changing. > Dataset.count() every time is different. > - > > Key:
[jira] [Updated] (SPARK-26778) Implement file source V2 partitioning
[ https://issues.apache.org/jira/browse/SPARK-26778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-26778: --- Summary: Implement file source V2 partitioning (was: Remove rule `FallbackOrcDataSourceV2` when catalog support of file data source v2 is finished) > Implement file source V2 partitioning > -- > > Key: SPARK-26778 > URL: https://issues.apache.org/jira/browse/SPARK-26778 > Project: Spark > Issue Type: Task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755709#comment-16755709 ] Jeffrey commented on SPARK-25420: - [~kabhwan] I cannot share the dataset since it is owned by my clients. I could elaborate more on the scenarios: >>drkcard_0_df = >>spark.read.csv("""[wasbs://e...@okprodstorage.blob.core.windows.net/AAA/WICTW/raw/TXN/POS/2018/09/**/*.gz|wasbs://l...@aswprodeastorage.blob.core.windows.net/ASW/WTCTW/raw/TlogParser/PARSED_TLOG/2018/09/**/*.gz]""") The dataset carries > 100,000 of records. >>drkcard_0_df.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and >>_c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|Tom| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|James| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence| >>dropDup_0=drkcard_0_df.dropDuplicates(["_c0","_c1","_c2","_c3","_c4"]) Running the where for 1st time: {color:#FF}Bug: Group C was mistakenly dropped.{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' >>and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mary| Running the where again: {color:#FF}Acceptable result{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|A|180919212732008200218|John| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| |2018-09-21 00:00:00|TDT_DSC_ITM|83|C|180919212732008200218|Laurence| Running the where again:{color:#FF}Bug: Group A and C were mistakenly dropped.{color} >>dropDup_0.where("_c0='2018-09-21 00:00:00' and _c1='TDT_DSC_ITM' and _c2='83' and _c4='180919212732008200218'").show(2000,False) |_c0|_c1|_c2|_c3|_c4|_c5| |2018-09-21 00:00:00|TDT_DSC_ITM|83|B|180919212732008200218|Mabel| Continue to repeat running the where statement, the number of rows keep changing. > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > > --The above is code > --- > > > console output: > source count=459275 > dropDuplicates count1=453987 > dropDuplicates count2=453987 > filter count1=445798 > filter count2=445797 > filter count3=445797 > filter count4=445798 > filter count5=445799 > > question: > > Why is filter.count() different everytime? > if I remove dropDuplicates() everything will be ok!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26777) SQL worked in 2.3.2 and fails in 2.4.0
Yuri Budilov created SPARK-26777: Summary: SQL worked in 2.3.2 and fails in 2.4.0 Key: SPARK-26777 URL: https://issues.apache.org/jira/browse/SPARK-26777 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: Yuri Budilov Following SQL worked in Spark 2.3.2 and now fails on 2.4.0 (AWS EMR Spark) PySpark call below: spark.sql("select partition_year_utc,partition_month_utc,partition_day_utc \ from datalake_reporting.copy_of_leads_notification \ where partition_year_utc = (select max(partition_year_utc) from datalake_reporting.copy_of_leads_notification) \ and partition_month_utc = \ (select max(partition_month_utc) from datalake_reporting.copy_of_leads_notification as m \ where \ m.partition_year_utc = (select max(partition_year_utc) from datalake_reporting.copy_of_leads_notification)) \ and partition_day_utc = (select max(d.partition_day_utc) from datalake_reporting.copy_of_leads_notification as d \ where d.partition_month_utc = \ (select max(m1.partition_month_utc) from datalake_reporting.copy_of_leads_notification as m1 \ where m1.partition_year_utc = \ (select max(y.partition_year_utc) from datalake_reporting.copy_of_leads_notification as y) \ ) \ ) \ order by 1 desc, 2 desc, 3 desc limit 1 ").show(1,False) Error: (no need for data, this is syntax). py4j.protocol.Py4JJavaError: An error occurred while calling o1326.showString. : java.lang.UnsupportedOperationException: Cannot evaluate expression: scalar-subquery#4495 [] Note: all 3 columns in query are Partitioned columns - see bottom of the schema) Hive EMR AWS Schema is: CREATE EXTERNAL TABLE `copy_of_leads_notification`( `message.environment.siteorigin` string, `dcpheader.dcploaddateutc` string, `message.id` int, `source.properties._country` string, `message.created` string, `dcpheader.generatedmessageid` string, `message.tags` bigint, `source.properties._enqueuedtimeutc` string, `source.properties._leadtype` string, `message.itemid` string, `message.prospect.postcode` string, `message.prospect.email` string, `message.referenceid` string, `message.item.year` string, `message.identifier` string, `dcpheader.dcploadmonthutc` string, `message.processed` string, `source.properties._tenant` string, `message.item.price` string, `message.subscription.confirmresponse` boolean, `message.itemtype` string, `message.prospect.lastname` string, `message.subscription.insurancequote` boolean, `source.exchangename` string, `message.prospect.identificationnumbers` bigint, `message.environment.ipaddress` string, `dcpheader.dcploaddayutc` string, `source.properties._itemtype` string, `source.properties._requesttype` string, `message.item.make` string, `message.prospect.firstname` string, `message.subscription.survey` boolean, `message.prospect.homephone` string, `message.extendedproperties` bigint, `message.subscription.financequote` boolean, `message.uniqueidentifier` string, `source.properties._id` string, `dcpheader.sourcemessageguid` string, `message.requesttype` string, `source.routingkey` string, `message.service` string, `message.item.model` string, `message.environment.pagesource` string, `source.source` string, `message.sellerid` string, `partition_date_utc` string, `message.selleridentifier` string, `message.subscription.newsletter` boolean, `dcpheader.dcploadyearutc` string, `message.leadtype` string, `message.history` bigint, `message.callconnect.calloutcome` string, `message.callconnect.datecreatedutc` string, `message.callconnect.callrecordingurl` string, `message.callconnect.transferoutcome` string, `message.callconnect.hiderecording` boolean, `message.callconnect.callstartutc` string, `message.callconnect.code` string, `message.callconnect.callduration` string, `message.fraudnetinfo` string, `message.callconnect.answernumber` string, `message.environment.sourcedevice` string, `message.comments` string, `message.fraudinfo.servervariables` bigint, `message.callconnect.servicenumber` string, `message.callconnect.callid` string, `message.callconnect.voicemailurl` string, `message.item.stocknumber` string, `message.callconnect.answerduration` string, `message.callconnect.callendutc` string, `message.item.series` string, `message.item.detailsurl` string, `message.item.pricetype` string, `message.item.description` string, `message.item.colour` string, `message.item.badge` string, `message.item.odometer` string, `message.environment.requestheader` string, `message.item.registrationnumber` string, `message.item.bodytype` string, `message.item.fueltype` string, `message.item.redbookcode` string, `message.item.spotid` string, `message.item.id` string, `message.item.transmission` string, `message.item.vin` string, `message.item.enginedescription` string, `message.prospect.mobilephone` string,
[jira] [Assigned] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
[ https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26776: --- Assignee: Hyukjin Kwon > Reduce Py4J communication cost in PySpark's execution barrier check > --- > > Key: SPARK-26776 > URL: https://issues.apache.org/jira/browse/SPARK-26776 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0, 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Minor > > I am investigating flaky tests. I realised that: > {code} > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2512, in __init__ > self.is_barrier = prev._is_barrier() or isFromBarrier > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2412, in _is_barrier > return self._jrdd.rdd().isBarrier() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 342, in get_return_value > return OUTPUT_CONVERTER[type](answer[2:], gateway_client) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 2492, in > lambda target_id, gateway_client: JavaObject(target_id, > gateway_client)) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1324, in __init__ > ThreadSafeFinalizer.add_finalizer(key, value) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", > line 43, in add_finalizer > cls.finalizers[id] = weak_ref > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in > __exit__ > self.release() > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in > release > self.__block.release() > error: release unlocked lock > {code} > I assume it might not be directly related with the test itself but I noticed > that it prev._is_barrier() attempts to access via Py4J. > Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
[ https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26776. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23690 [https://github.com/apache/spark/pull/23690] > Reduce Py4J communication cost in PySpark's execution barrier check > --- > > Key: SPARK-26776 > URL: https://issues.apache.org/jira/browse/SPARK-26776 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0, 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Minor > Fix For: 3.0.0 > > > I am investigating flaky tests. I realised that: > {code} > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2512, in __init__ > self.is_barrier = prev._is_barrier() or isFromBarrier > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2412, in _is_barrier > return self._jrdd.rdd().isBarrier() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 342, in get_return_value > return OUTPUT_CONVERTER[type](answer[2:], gateway_client) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 2492, in > lambda target_id, gateway_client: JavaObject(target_id, > gateway_client)) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1324, in __init__ > ThreadSafeFinalizer.add_finalizer(key, value) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", > line 43, in add_finalizer > cls.finalizers[id] = weak_ref > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in > __exit__ > self.release() > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in > release > self.__block.release() > error: release unlocked lock > {code} > I assume it might not be directly related with the test itself but I noticed > that it prev._is_barrier() attempts to access via Py4J. > Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26726) Synchronize the amount of memory used by the broadcast variable to the UI display
[ https://issues.apache.org/jira/browse/SPARK-26726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hantiantian updated SPARK-26726: Summary: Synchronize the amount of memory used by the broadcast variable to the UI display (was: The amount of memory used by the broadcast variable is not synchronized to the UI display) > Synchronize the amount of memory used by the broadcast variable to the UI > display > --- > > Key: SPARK-26726 > URL: https://issues.apache.org/jira/browse/SPARK-26726 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: hantiantian >Priority: Major > > The amount of memory used by the broadcast variable is not synchronized to > the UI display, > spark-sql> select /*+ broadcast(a)*/ a.id,b.id from a join b on a.id = b.id; > View the app's driver log: > 2019-01-25 16:45:23,726 INFO org.apache.spark.storage.BlockManagerInfo: Added > broadcast_4_piece0 in memory on 10.43.xx.xx:33907 (size: 6.6 KB, free: 2.5 GB) > 2019-01-25 16:45:23,727 INFO org.apache.spark.storage.BlockManagerInfo: > Added broadcast_4_piece0 in memory on 10.43.xx.xx:38399 (size: 6.6 KB, free: > 2.5 GB) > 2019-01-25 16:45:23,745 INFO org.apache.spark.storage.BlockManagerInfo: > Added broadcast_3_piece0 in memory on 10.43.xx.xx:33907 (size: 32.1 KB, free: > 2.5 GB) > 2019-01-25 16:45:23,749 INFO org.apache.spark.storage.BlockManagerInfo: > Added broadcast_3_piece0 in memory on 10.43.xx.xx:38399 (size: 32.1 KB, free: > 2.5 GB) > 2019-01-25 16:45:23,838 INFO org.apache.spark.storage.BlockManagerInfo: > Added broadcast_2_piece0 in memory on 10.43.xx.xx:38399 (size: 147.0 B, free: > 2.5 GB) > 2019-01-25 16:45:23,840 INFO org.apache.spark.storage.BlockManagerInfo: > Added broadcast_2_piece0 in memory on 10.43.xx.xx:33907 (size: 147.0 B, free: > 2.5 GB) > > Web UI does not have the use of memory, > ||Executor ID||Address||Status||RDD Blocks||Storage Memory||Disk > Used||Cores||Active Tasks||Failed Tasks||Complete Tasks||Total Tasks||Task > Time (GC Time)||Input||Shuffle Read||Shuffle Write||Logs||Thread Dump|| > |0|xxx:38399|Active|0|0.0 B / 2.7 GB|0.0 B|1|0|0|2|2|4 s (0.4 s)|8 B|0.0 > B|0.0 > B|[stdout|http://10.43.183.120:18085/logPage/?appId=app-20190125164426-0003=0=stdout] > > [stderr|http://10.43.183.120:18085/logPage/?appId=app-20190125164426-0003=0=stderr]|[Thread > Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=0]| > |driver|xxx:47936|Active|0|0.0 B / 384.1 MB|0.0 B|0|0|0|0|0|0.0 ms (0.0 > ms)|0.0 B|0.0 B|0.0 B| |[Thread > Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=driver]| > |1|xxx:47414|Active|0|0.0 B / 2.7 GB|0.0 B|1|0|0|0|0|0.0 ms (0.0 ms)|0.0 > B|0.0 B|0.0 > B|[stdout|http://10.43.183.121:18085/logPage/?appId=app-20190125164426-0003=1=stdout] > > [stderr|http://10.43.183.121:18085/logPage/?appId=app-20190125164426-0003=1=stderr]|[Thread > Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=1]| > |2|xxx:33907|Active|0|0.0 B / 2.7 GB|0.0 B|1|0|0|2|2|4 s (0.2 s)|4 B|0.0 > B|0.0 > B|[stdout|http://10.43.183.122:18085/logPage/?appId=app-20190125164426-0003=2=stdout] > > [stderr|http://10.43.183.122:18085/logPage/?appId=app-20190125164426-0003=2=stderr]|[Thread > Dump|http://10.43.183.121:4040/executors/threadDump/?executorId=2]| > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
[ https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26776: Assignee: Apache Spark > Reduce Py4J communication cost in PySpark's execution barrier check > --- > > Key: SPARK-26776 > URL: https://issues.apache.org/jira/browse/SPARK-26776 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0, 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Apache Spark >Priority: Minor > > I am investigating flaky tests. I realised that: > {code} > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2512, in __init__ > self.is_barrier = prev._is_barrier() or isFromBarrier > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2412, in _is_barrier > return self._jrdd.rdd().isBarrier() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 342, in get_return_value > return OUTPUT_CONVERTER[type](answer[2:], gateway_client) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 2492, in > lambda target_id, gateway_client: JavaObject(target_id, > gateway_client)) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1324, in __init__ > ThreadSafeFinalizer.add_finalizer(key, value) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", > line 43, in add_finalizer > cls.finalizers[id] = weak_ref > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in > __exit__ > self.release() > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in > release > self.__block.release() > error: release unlocked lock > {code} > I assume it might not be directly related with the test itself but I noticed > that it prev._is_barrier() attempts to access via Py4J. > Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
Hyukjin Kwon created SPARK-26776: Summary: Reduce Py4J communication cost in PySpark's execution barrier check Key: SPARK-26776 URL: https://issues.apache.org/jira/browse/SPARK-26776 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.0, 3.0.0 Reporter: Hyukjin Kwon I am investigating flaky tests. I realised that: {code} File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2512, in __init__ self.is_barrier = prev._is_barrier() or isFromBarrier File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2412, in _is_barrier return self._jrdd.rdd().isBarrier() File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 342, in get_return_value return OUTPUT_CONVERTER[type](answer[2:], gateway_client) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 2492, in lambda target_id, gateway_client: JavaObject(target_id, gateway_client)) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1324, in __init__ ThreadSafeFinalizer.add_finalizer(key, value) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", line 43, in add_finalizer cls.finalizers[id] = weak_ref File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in __exit__ self.release() File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in release self.__block.release() error: release unlocked lock {code} I assume it might not be directly related with the test itself but I noticed that it prev._is_barrier() attempts to access via Py4J. Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
[ https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755605#comment-16755605 ] Apache Spark commented on SPARK-26776: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/23690 > Reduce Py4J communication cost in PySpark's execution barrier check > --- > > Key: SPARK-26776 > URL: https://issues.apache.org/jira/browse/SPARK-26776 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0, 3.0.0 >Reporter: Hyukjin Kwon >Priority: Minor > > I am investigating flaky tests. I realised that: > {code} > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2512, in __init__ > self.is_barrier = prev._is_barrier() or isFromBarrier > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2412, in _is_barrier > return self._jrdd.rdd().isBarrier() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 342, in get_return_value > return OUTPUT_CONVERTER[type](answer[2:], gateway_client) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 2492, in > lambda target_id, gateway_client: JavaObject(target_id, > gateway_client)) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1324, in __init__ > ThreadSafeFinalizer.add_finalizer(key, value) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", > line 43, in add_finalizer > cls.finalizers[id] = weak_ref > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in > __exit__ > self.release() > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in > release > self.__block.release() > error: release unlocked lock > {code} > I assume it might not be directly related with the test itself but I noticed > that it prev._is_barrier() attempts to access via Py4J. > Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
[ https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26776: Assignee: (was: Apache Spark) > Reduce Py4J communication cost in PySpark's execution barrier check > --- > > Key: SPARK-26776 > URL: https://issues.apache.org/jira/browse/SPARK-26776 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0, 3.0.0 >Reporter: Hyukjin Kwon >Priority: Minor > > I am investigating flaky tests. I realised that: > {code} > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2512, in __init__ > self.is_barrier = prev._is_barrier() or isFromBarrier > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2412, in _is_barrier > return self._jrdd.rdd().isBarrier() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 342, in get_return_value > return OUTPUT_CONVERTER[type](answer[2:], gateway_client) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 2492, in > lambda target_id, gateway_client: JavaObject(target_id, > gateway_client)) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1324, in __init__ > ThreadSafeFinalizer.add_finalizer(key, value) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", > line 43, in add_finalizer > cls.finalizers[id] = weak_ref > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in > __exit__ > self.release() > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in > release > self.__block.release() > error: release unlocked lock > {code} > I assume it might not be directly related with the test itself but I noticed > that it prev._is_barrier() attempts to access via Py4J. > Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26776) Reduce Py4J communication cost in PySpark's execution barrier check
[ https://issues.apache.org/jira/browse/SPARK-26776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755602#comment-16755602 ] Apache Spark commented on SPARK-26776: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/23690 > Reduce Py4J communication cost in PySpark's execution barrier check > --- > > Key: SPARK-26776 > URL: https://issues.apache.org/jira/browse/SPARK-26776 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.0, 3.0.0 >Reporter: Hyukjin Kwon >Priority: Minor > > I am investigating flaky tests. I realised that: > {code} > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2512, in __init__ > self.is_barrier = prev._is_barrier() or isFromBarrier > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line > 2412, in _is_barrier > return self._jrdd.rdd().isBarrier() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 342, in get_return_value > return OUTPUT_CONVERTER[type](answer[2:], gateway_client) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 2492, in > lambda target_id, gateway_client: JavaObject(target_id, > gateway_client)) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1324, in __init__ > ThreadSafeFinalizer.add_finalizer(key, value) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", > line 43, in add_finalizer > cls.finalizers[id] = weak_ref > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in > __exit__ > self.release() > File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in > release > self.__block.release() > error: release unlocked lock > {code} > I assume it might not be directly related with the test itself but I noticed > that it prev._is_barrier() attempts to access via Py4J. > Accessing via Py4J is expensive and IMHO it makes it flaky. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755583#comment-16755583 ] Jungtaek Lim commented on SPARK-25420: -- [~jeffrey.mak] Could you provide some data example and query to reproduce the case? The description from issue is clear to "not a bug" assuming the order of rows are non-deterministic unless we put sort on it - dropDuplicates will provide only first row of the group which is also non-deterministic. If you have producible case which doesn't fall into this, please share your case. > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > > --The above is code > --- > > > console output: > source count=459275 > dropDuplicates count1=453987 > dropDuplicates count2=453987 > filter count1=445798 > filter count2=445797 > filter count3=445797 > filter count4=445798 > filter count5=445799 > > question: > > Why is filter.count() different everytime? > if I remove dropDuplicates() everything will be ok!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25420) Dataset.count() every time is different.
[ https://issues.apache.org/jira/browse/SPARK-25420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755546#comment-16755546 ] Jeffrey commented on SPARK-25420: - [~mgaido] Could you elaborate more why this is not a bug? dropDuplicates() is expected to drop only the rows that is duplicate in a dataframe. I experienced the same non-deterministic behavior when doing filter after dropDuplicates over a subset of columns. Sometimes, it returns less row than I expected. That is, some rows which is not duplicates but also got dropped. > Dataset.count() every time is different. > - > > Key: SPARK-25420 > URL: https://issues.apache.org/jira/browse/SPARK-25420 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.3.0 > Environment: spark2.3 > standalone >Reporter: huanghuai >Priority: Major > Labels: SQL > > Dataset dataset = sparkSession.read().format("csv").option("sep", > ",").option("inferSchema", "true") > .option("escape", Constants.DEFAULT_CSV_ESCAPE).option("header", "true") > .option("encoding", "UTF-8") > .load("hdfs://192.168.1.26:9000/data/caopan/07-08_WithHead30M.csv"); > System.out.println("source count="+dataset.count()); > Dataset dropDuplicates = dataset.dropDuplicates(new > String[]\{"DATE","TIME","VEL","COMPANY"}); > System.out.println("dropDuplicates count1="+dropDuplicates.count()); > System.out.println("dropDuplicates count2="+dropDuplicates.count()); > Dataset filter = dropDuplicates.filter("jd > 120.85 and wd > 30.66 > and (status = 0 or status = 1)"); > System.out.println("filter count1="+filter.count()); > System.out.println("filter count2="+filter.count()); > System.out.println("filter count3="+filter.count()); > System.out.println("filter count4="+filter.count()); > System.out.println("filter count5="+filter.count()); > > > --The above is code > --- > > > console output: > source count=459275 > dropDuplicates count1=453987 > dropDuplicates count2=453987 > filter count1=445798 > filter count2=445797 > filter count3=445797 > filter count4=445798 > filter count5=445799 > > question: > > Why is filter.count() different everytime? > if I remove dropDuplicates() everything will be ok!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26732) Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that actually persist data
[ https://issues.apache.org/jira/browse/SPARK-26732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755515#comment-16755515 ] Takeshi Yamamuro commented on SPARK-26732: -- Thanks for pinging me, dongjoon! > Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that > actually persist data > --- > > Key: SPARK-26732 > URL: https://issues.apache.org/jira/browse/SPARK-26732 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.3.2, 2.4.0, 3.0.0 >Reporter: Marcelo Vanzin >Priority: Major > > From > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/5437/testReport/junit/org.apache.spark/SparkContextInfoSuite/getRDDStorageInfo_only_reports_on_RDDs_that_actually_persist_data/: > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: 0 did not equal 1 > Stacktrace > sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 did > not equal 1 > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527) > at > org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501) > at > org.apache.spark.SparkContextInfoSuite.$anonfun$new$3(SparkContextInfoSuite.scala:63) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens
[ https://issues.apache.org/jira/browse/SPARK-26766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755501#comment-16755501 ] Marcelo Vanzin commented on SPARK-26766: The only thing YARN-specific about {{hadoopFSsToAccess}} is the reference to {{STAGING_DIR}}. Even {{FILESYSTEMS_TO_ACCESS}} makes sense for other RMs, it could just be renamed with a more generic key (e.g. under {{spark.kerberos.}}). To reduce complexity, I think it should be fine to just move {{hadoopFSsToAccess}} to {{HadoopFSDelegationTokenProvider}}. The only drawback is looking at {{STAGING_DIR}} outside of YARN; if that's a real concern, just add a check in the code that ignores it if the master is not "yarn". > Remove the list of filesystems from > HadoopDelegationTokenProvider.obtainDelegationTokens > > > Key: SPARK-26766 > URL: https://issues.apache.org/jira/browse/SPARK-26766 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Minor > > This was discussed in previous PR > [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54]. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26732) Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that actually persist data
[ https://issues.apache.org/jira/browse/SPARK-26732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755412#comment-16755412 ] Dongjoon Hyun commented on SPARK-26732: --- cc [~maropu]. > Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that > actually persist data > --- > > Key: SPARK-26732 > URL: https://issues.apache.org/jira/browse/SPARK-26732 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.3.2, 2.4.0, 3.0.0 >Reporter: Marcelo Vanzin >Priority: Major > > From > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/5437/testReport/junit/org.apache.spark/SparkContextInfoSuite/getRDDStorageInfo_only_reports_on_RDDs_that_actually_persist_data/: > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: 0 did not equal 1 > Stacktrace > sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 did > not equal 1 > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527) > at > org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501) > at > org.apache.spark.SparkContextInfoSuite.$anonfun$new$3(SparkContextInfoSuite.scala:63) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26732) Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that actually persist data
[ https://issues.apache.org/jira/browse/SPARK-26732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26732: -- Affects Version/s: 2.3.2 2.4.0 > Flaky test: SparkContextInfoSuite.getRDDStorageInfo only reports on RDDs that > actually persist data > --- > > Key: SPARK-26732 > URL: https://issues.apache.org/jira/browse/SPARK-26732 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.3.2, 2.4.0, 3.0.0 >Reporter: Marcelo Vanzin >Priority: Major > > From > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/5437/testReport/junit/org.apache.spark/SparkContextInfoSuite/getRDDStorageInfo_only_reports_on_RDDs_that_actually_persist_data/: > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: 0 did not equal 1 > Stacktrace > sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 did > not equal 1 > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527) > at > org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) > at > org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501) > at > org.apache.spark.SparkContextInfoSuite.$anonfun$new$3(SparkContextInfoSuite.scala:63) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25035) Replicating disk-stored blocks should avoid memory mapping
[ https://issues.apache.org/jira/browse/SPARK-25035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25035: Assignee: (was: Apache Spark) > Replicating disk-stored blocks should avoid memory mapping > -- > > Key: SPARK-25035 > URL: https://issues.apache.org/jira/browse/SPARK-25035 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Imran Rashid >Priority: Major > Labels: memory-analysis > > This is a follow-up to SPARK-24296. > When replicating a disk-cached block, even if we fetch-to-disk, we still > memory-map the file, just to copy it to another location. > Ideally we'd just move the tmp file to the right location. But even without > that, we could read the file as an input stream, instead of memory-mapping > the whole thing. Memory-mapping is particularly a problem when running under > yarn, as the OS may believe there is plenty of memory available, meanwhile > yarn decides to kill the process for exceeding memory limits. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25035) Replicating disk-stored blocks should avoid memory mapping
[ https://issues.apache.org/jira/browse/SPARK-25035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25035: Assignee: Apache Spark > Replicating disk-stored blocks should avoid memory mapping > -- > > Key: SPARK-25035 > URL: https://issues.apache.org/jira/browse/SPARK-25035 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Imran Rashid >Assignee: Apache Spark >Priority: Major > Labels: memory-analysis > > This is a follow-up to SPARK-24296. > When replicating a disk-cached block, even if we fetch-to-disk, we still > memory-map the file, just to copy it to another location. > Ideally we'd just move the tmp file to the right location. But even without > that, we could read the file as an input stream, instead of memory-mapping > the whole thing. Memory-mapping is particularly a problem when running under > yarn, as the OS may believe there is plenty of memory available, meanwhile > yarn decides to kill the process for exceeding memory limits. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26718) Fixed integer overflow in SS kafka rateLimit calculation
[ https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26718: -- Fix Version/s: 2.4.1 > Fixed integer overflow in SS kafka rateLimit calculation > > > Key: SPARK-26718 > URL: https://issues.apache.org/jira/browse/SPARK-26718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Ryne Yang >Assignee: Ryne Yang >Priority: Major > Fix For: 2.4.1, 3.0.0 > > > when running spark structured streaming using lib: `"org.apache.spark" %% > "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current > offset fetching: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): > java.lang.AssertionError: assertion failed: latest offs > et -9223372036854775808 does not equal -1 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for > one of the partitions. I checked the structured streaming checkpoint, that > was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE. > kafka broker version: 1.1.0. > lib we used: > {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % > "2.4.0" }} > how to reproduce: > basically we started a structured streamer and subscribed a topic of 4 > partitions. then produced some messages into topic, job crashed and logged > the stacktrace like above. > also the committed offsets seem fine as we see in the logs: > {code:java} > === Streaming Query === > Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = > 31878627-d473-4ee8-955d-d4d3f3f45eb9] > Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":1}}} > Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":-9223372036854775808}}} > {code} > so spark streaming recorded the correct value for partition: 0, but the > current available offsets returned from kafka is showing Long.MIN_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26775) Update Jenkins nodes to support local volumes for K8s integration tests
[ https://issues.apache.org/jira/browse/SPARK-26775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stavros Kontopoulos updated SPARK-26775: Description: Current version of Minikube on test machines does not support properly the local persistent volume feature required by this PR: [https://github.com/apache/spark/pull/23514]. We get his error: "spec.local: Forbidden: Local volumes are disabled by feature-gate, metadata.annotations: Required value: Local volume requires node affinity" This is probably due to this: [https://github.com/rancher/rancher/issues/13864] which implies that we need to update to 1.10+ as described in [https://kubernetes.io/docs/concepts/storage/volumes/#local]. Fabric8io client is already updated in the PR mentioned at the beginning. was: Current version of Minikube on test machines does not support properly the local persistent volume feature required by this PR: https://github.com/apache/spark/pull/23514. We get his error: "spec.local: Forbidden: Local volumes are disabled by feature-gate, metadata.annotations: Required value: Local volume requires node affinity" This is probably due to this: [https://github.com/rancher/rancher/issues/13864] which implies that we need to update to 1.10+ as described in [https://kubernetes.io/docs/concepts/storage/volumes/#loca,l|https://kubernetes.io/docs/concepts/storage/volumes/#local] since fabric8io client is already updated in the PR mentioned at the beginning. > Update Jenkins nodes to support local volumes for K8s integration tests > --- > > Key: SPARK-26775 > URL: https://issues.apache.org/jira/browse/SPARK-26775 > Project: Spark > Issue Type: Improvement > Components: jenkins, Kubernetes >Affects Versions: 3.0.0 >Reporter: Stavros Kontopoulos >Priority: Major > > Current version of Minikube on test machines does not support properly the > local persistent volume feature required by this PR: > [https://github.com/apache/spark/pull/23514]. > We get his error: > "spec.local: Forbidden: Local volumes are disabled by feature-gate, > metadata.annotations: Required value: Local volume requires node affinity" > This is probably due to this: > [https://github.com/rancher/rancher/issues/13864] which implies that we need > to update to 1.10+ as described in > [https://kubernetes.io/docs/concepts/storage/volumes/#local]. Fabric8io > client is already updated in the PR mentioned at the beginning. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26775) Update Jenkins nodes to support local volumes for K8s integration tests
Stavros Kontopoulos created SPARK-26775: --- Summary: Update Jenkins nodes to support local volumes for K8s integration tests Key: SPARK-26775 URL: https://issues.apache.org/jira/browse/SPARK-26775 Project: Spark Issue Type: Improvement Components: jenkins, Kubernetes Affects Versions: 3.0.0 Reporter: Stavros Kontopoulos Current version of Minikube on test machines does not support properly the local persistent volume feature required by this PR: https://github.com/apache/spark/pull/23514. We get his error: "spec.local: Forbidden: Local volumes are disabled by feature-gate, metadata.annotations: Required value: Local volume requires node affinity" This is probably due to this: [https://github.com/rancher/rancher/issues/13864] which implies that we need to update to 1.10+ as described in [https://kubernetes.io/docs/concepts/storage/volumes/#loca,l|https://kubernetes.io/docs/concepts/storage/volumes/#local] since fabric8io client is already updated in the PR mentioned at the beginning. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25035) Replicating disk-stored blocks should avoid memory mapping
[ https://issues.apache.org/jira/browse/SPARK-25035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755322#comment-16755322 ] Attila Zsolt Piros commented on SPARK-25035: I am working on this. > Replicating disk-stored blocks should avoid memory mapping > -- > > Key: SPARK-25035 > URL: https://issues.apache.org/jira/browse/SPARK-25035 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: Imran Rashid >Priority: Major > Labels: memory-analysis > > This is a follow-up to SPARK-24296. > When replicating a disk-cached block, even if we fetch-to-disk, we still > memory-map the file, just to copy it to another location. > Ideally we'd just move the tmp file to the right location. But even without > that, we could read the file as an input stream, instead of memory-mapping > the whole thing. Memory-mapping is particularly a problem when running under > yarn, as the OS may believe there is plenty of memory available, meanwhile > yarn decides to kill the process for exceeding memory limits. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26718) Fixed integer overflow in SS kafka rateLimit calculation
[ https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26718: -- Summary: Fixed integer overflow in SS kafka rateLimit calculation (was: structured streaming fetched wrong current offset from kafka) > Fixed integer overflow in SS kafka rateLimit calculation > > > Key: SPARK-26718 > URL: https://issues.apache.org/jira/browse/SPARK-26718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Ryne Yang >Assignee: Ryne Yang >Priority: Major > > when running spark structured streaming using lib: `"org.apache.spark" %% > "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current > offset fetching: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): > java.lang.AssertionError: assertion failed: latest offs > et -9223372036854775808 does not equal -1 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for > one of the partitions. I checked the structured streaming checkpoint, that > was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE. > kafka broker version: 1.1.0. > lib we used: > {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % > "2.4.0" }} > how to reproduce: > basically we started a structured streamer and subscribed a topic of 4 > partitions. then produced some messages into topic, job crashed and logged > the stacktrace like above. > also the committed offsets seem fine as we see in the logs: > {code:java} > === Streaming Query === > Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = > 31878627-d473-4ee8-955d-d4d3f3f45eb9] > Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":1}}} > Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":-9223372036854775808}}} > {code} > so spark streaming recorded the correct value for partition: 0, but the > current available offsets returned from kafka is showing Long.MIN_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26718) structured streaming fetched wrong current offset from kafka
[ https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-26718: - Assignee: Ryne Yang > structured streaming fetched wrong current offset from kafka > > > Key: SPARK-26718 > URL: https://issues.apache.org/jira/browse/SPARK-26718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Ryne Yang >Assignee: Ryne Yang >Priority: Major > > when running spark structured streaming using lib: `"org.apache.spark" %% > "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current > offset fetching: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): > java.lang.AssertionError: assertion failed: latest offs > et -9223372036854775808 does not equal -1 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for > one of the partitions. I checked the structured streaming checkpoint, that > was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE. > kafka broker version: 1.1.0. > lib we used: > {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % > "2.4.0" }} > how to reproduce: > basically we started a structured streamer and subscribed a topic of 4 > partitions. then produced some messages into topic, job crashed and logged > the stacktrace like above. > also the committed offsets seem fine as we see in the logs: > {code:java} > === Streaming Query === > Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = > 31878627-d473-4ee8-955d-d4d3f3f45eb9] > Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":1}}} > Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":-9223372036854775808}}} > {code} > so spark streaming recorded the correct value for partition: 0, but the > current available offsets returned from kafka is showing Long.MIN_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26718) Fixed integer overflow in SS kafka rateLimit calculation
[ https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-26718. --- Resolution: Fixed Fix Version/s: 3.0.0 This is resolved via https://github.com/apache/spark/pull/23666 > Fixed integer overflow in SS kafka rateLimit calculation > > > Key: SPARK-26718 > URL: https://issues.apache.org/jira/browse/SPARK-26718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Ryne Yang >Assignee: Ryne Yang >Priority: Major > Fix For: 3.0.0 > > > when running spark structured streaming using lib: `"org.apache.spark" %% > "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current > offset fetching: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): > java.lang.AssertionError: assertion failed: latest offs > et -9223372036854775808 does not equal -1 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329) > at > org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314) > at > org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for > one of the partitions. I checked the structured streaming checkpoint, that > was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE. > kafka broker version: 1.1.0. > lib we used: > {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % > "2.4.0" }} > how to reproduce: > basically we started a structured streamer and subscribed a topic of 4 > partitions. then produced some messages into topic, job crashed and logged > the stacktrace like above. > also the committed offsets seem fine as we see in the logs: > {code:java} > === Streaming Query === > Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = > 31878627-d473-4ee8-955d-d4d3f3f45eb9] > Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":1}}} > Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: > {"REVENUEEVENT":{"0":-9223372036854775808}}} > {code} > so spark streaming recorded the correct value for partition: 0, but the > current available offsets returned from kafka is showing Long.MIN_VALUE. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25994) SPIP: Property Graphs, Cypher Queries, and Algorithms
[ https://issues.apache.org/jira/browse/SPARK-25994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755272#comment-16755272 ] Saikat Kanjilal commented on SPARK-25994: - [~mju] I would like to help out on this issue on the design/implementation, thoughts on the best steps to get involved, I will review the doc above as a first step > SPIP: Property Graphs, Cypher Queries, and Algorithms > - > > Key: SPARK-25994 > URL: https://issues.apache.org/jira/browse/SPARK-25994 > Project: Spark > Issue Type: New Feature > Components: GraphX >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Martin Junghanns >Priority: Major > Labels: SPIP > > Copied from the SPIP doc: > {quote} > GraphX was one of the foundational pillars of the Spark project, and is the > current graph component. This reflects the importance of the graphs data > model, which naturally pairs with an important class of analytic function, > the network or graph algorithm. > However, GraphX is not actively maintained. It is based on RDDs, and cannot > exploit Spark 2’s Catalyst query engine. GraphX is only available to Scala > users. > GraphFrames is a Spark package, which implements DataFrame-based graph > algorithms, and also incorporates simple graph pattern matching with fixed > length patterns (called “motifs”). GraphFrames is based on DataFrames, but > has a semantically weak graph data model (based on untyped edges and > vertices). The motif pattern matching facility is very limited by comparison > with the well-established Cypher language. > The Property Graph data model has become quite widespread in recent years, > and is the primary focus of commercial graph data management and of graph > data research, both for on-premises and cloud data management. Many users of > transactional graph databases also wish to work with immutable graphs in > Spark. > The idea is to define a Cypher-compatible Property Graph type based on > DataFrames; to replace GraphFrames querying with Cypher; to reimplement > GraphX/GraphFrames algos on the PropertyGraph type. > To achieve this goal, a core subset of Cypher for Apache Spark (CAPS), > reusing existing proven designs and code, will be employed in Spark 3.0. This > graph query processor, like CAPS, will overlay and drive the SparkSQL > Catalyst query engine, using the CAPS graph query planner. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26774) Document threading concerns in TaskSchedulerImpl
Imran Rashid created SPARK-26774: Summary: Document threading concerns in TaskSchedulerImpl Key: SPARK-26774 URL: https://issues.apache.org/jira/browse/SPARK-26774 Project: Spark Issue Type: Improvement Components: Scheduler, Spark Core Affects Versions: 3.0.0 Reporter: Imran Rashid TaskSchedulerImpl has a bunch of threading concerns, which are not well documented -- in fact the docs it has are somewhat misleading. In particular, some of the methods should only be called within the DAGScheduler event loop. This suggests some potential refactoring to avoid so many mixed concerns inside TaskSchedulerImpl, but that's a lot harder to do safely, I just want to add some comments. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26765) Avro: Validate input and output schema
[ https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26765. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23684 [https://github.com/apache/spark/pull/23684] > Avro: Validate input and output schema > -- > > Key: SPARK-26765 > URL: https://issues.apache.org/jira/browse/SPARK-26765 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Minor > Fix For: 3.0.0 > > > The API supportDataType in FileFormat helps to validate the output/input > schema before exection starts. So that we can avoid some invalid data source > IO, and users can see clean error messages. > This PR is to override the validation API in Avro data source. > Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), > NullType is supported. This PR fixes the handling of NullType. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26765) Avro: Validate input and output schema
[ https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26765: --- Assignee: Gengliang Wang > Avro: Validate input and output schema > -- > > Key: SPARK-26765 > URL: https://issues.apache.org/jira/browse/SPARK-26765 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Minor > > The API supportDataType in FileFormat helps to validate the output/input > schema before exection starts. So that we can avoid some invalid data source > IO, and users can see clean error messages. > This PR is to override the validation API in Avro data source. > Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), > NullType is supported. This PR fixes the handling of NullType. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26752) Multiple aggregate methods in the same column in DataFrame
[ https://issues.apache.org/jira/browse/SPARK-26752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755156#comment-16755156 ] Guilherme Beltramini commented on SPARK-26752: -- Thanks for the input! I also agree with [~mgaido] about deprecating the input Map[String, String]. > Multiple aggregate methods in the same column in DataFrame > -- > > Key: SPARK-26752 > URL: https://issues.apache.org/jira/browse/SPARK-26752 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Guilherme Beltramini >Priority: Minor > > The agg function in > [org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset] > accepts as input: > * Column* > * Map[String, String] > * (String, String)* > I'm proposing to add Map[String, Seq[String]], where the keys are the columns > to aggregate, and the values are the functions to apply the aggregation. Here > is a similar question: > http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-multiple-agg-on-the-same-column-td29541.html. > In the example below (running in spark-shell, with Spark 2.4.0), I'm showing > a workaround. What I'm proposing is that agg should accept aggMap as input: > {code:java} > scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", > 20), ("c", 100)).toDF("col1", "col2") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: int] > scala> df.show > +++ > |col1|col2| > +++ > | a| 1| > | a| 2| > | a| 3| > | a| 4| > | b| 10| > | b| 20| > | c| 100| > +++ > scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", > "mean")) > aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> > List(count), col2 -> List(min, max, mean)) > scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) > => Seq(c).zipAll(fns, c, "") } > aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), > (col2,max), (col2,mean)) > scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*) > dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint > ... 3 more fields] > scala> dfAgg.orderBy("col1").show > ++---+-+-+-+ > |col1|count(col1)|min(col2)|max(col2)|avg(col2)| > ++---+-+-+-+ > | a| 4|1|4| 2.5| > | b| 2| 10| 20| 15.0| > | c| 1| 100| 100|100.0| > ++---+-+-+-+ > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently
[ https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated SPARK-26772: -- Description: YARNHadoopDelegationTokenManager now loads ServiceCredentialProviders in one steps. The drawback of this if one provider fails all others are not loaded. > YARNHadoopDelegationTokenManager should load ServiceCredentialProviders > independently > - > > Key: SPARK-26772 > URL: https://issues.apache.org/jira/browse/SPARK-26772 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > > YARNHadoopDelegationTokenManager now loads ServiceCredentialProviders in one > steps. The drawback of this if one provider fails all others are not loaded. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26702) Create a test trait for Parquet and Orc test
[ https://issues.apache.org/jira/browse/SPARK-26702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-26702. --- Resolution: Fixed Assignee: Liang-Chi Hsieh Fix Version/s: 3.0.0 This is resolved via https://github.com/apache/spark/pull/23628 > Create a test trait for Parquet and Orc test > > > Key: SPARK-26702 > URL: https://issues.apache.org/jira/browse/SPARK-26702 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 3.0.0 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 3.0.0 > > > For making test suite supporting both Parquet and Orc by reusing test cases, > this patch extracts the methods for testing. For example, if we need to test > a common feature shared by Parquet and Orc, we should be able to write test > cases once and reuse them to test both formats. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26773) Consider alternative base images for Kubernetes
Ondrej Kokes created SPARK-26773: Summary: Consider alternative base images for Kubernetes Key: SPARK-26773 URL: https://issues.apache.org/jira/browse/SPARK-26773 Project: Spark Issue Type: Improvement Components: Kubernetes, PySpark Affects Versions: 2.4.0 Reporter: Ondrej Kokes I understand the desire to make the base image (not just) for Kubernetes to be minimal and thus the choice of Alpine, but that distro has its limitations. The main one being musl as its libc implementation. The main reason for us not to use Alpine for our non-Spark workloads is that we're using Python and *we cannot use pre-built distributions of packages (so-called wheels)*, because they are usually built for glibc-based distros (work is being done for musl-based builds, but we're not there yet [0]). So instead of popular packages like numpy or pandas being installed in seconds, a build process has to be initiated upon each installation of many packages (and that requires a compiler etc.). We could theoretically build all these packages into the base image, but that would require multi-step builds, so that we don't include gcc/clang in the final image, having to rebuild the docker image with each dependency change etc. There have already been similar issues submitted [1]. *I'm not sure what the best course of action is.* If there should be a e.g. debian-based distro as an alternative. Or perhaps there could be a good reason for a glibc-based distro to be the default Docker base image, with an option to "downgrade" to Alpine. (I'm guessing that R, with its popular Rcpp-based extensions, might suffer from a similar problem, but I'm mostly guessing. [2]) [0] https://www.python.org/dev/peps/pep-0513/ [1] https://github.com/apache-spark-on-k8s/spark/issues/326 [2] https://github.com/rocker-org/rocker/issues/231#issuecomment-297150217 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26763) Using fileStatus cache when filterPartitions
[ https://issues.apache.org/jira/browse/SPARK-26763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26763. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23683 [https://github.com/apache/spark/pull/23683] > Using fileStatus cache when filterPartitions > > > Key: SPARK-26763 > URL: https://issues.apache.org/jira/browse/SPARK-26763 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xianyang Liu >Assignee: Xianyang Liu >Priority: Major > Fix For: 3.0.0 > > > We should pass the existed `fileStatusCache` to `InMemoryFileIndex` even > though there aren't partition columns. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-11215) Add multiple columns support to StringIndexer
[ https://issues.apache.org/jira/browse/SPARK-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-11215. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 20146 [https://github.com/apache/spark/pull/20146] > Add multiple columns support to StringIndexer > - > > Key: SPARK-11215 > URL: https://issues.apache.org/jira/browse/SPARK-11215 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: Yanbo Liang >Assignee: Yanbo Liang >Priority: Major > Labels: release-notes > Fix For: 3.0.0 > > > Add multiple columns support to StringIndexer, then users can transform > multiple input columns to multiple output columns simultaneously. See > discussion SPARK-8418. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default
[ https://issues.apache.org/jira/browse/SPARK-26771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-26771: -- Docs Text: The RDD and DataFrame .unpersist() method, and Broadcast .destroy() method, take an optional 'blocking' argument. The default was 'false' in all cases except for (Scala) RDDs and their GraphX subclasses. The default is now 'false' (non-blocking) in all of these methods. > Make .unpersist(), .destroy() consistently non-blocking by default > -- > > Key: SPARK-26771 > URL: https://issues.apache.org/jira/browse/SPARK-26771 > Project: Spark > Issue Type: Improvement > Components: GraphX, Spark Core >Affects Versions: 2.4.0 >Reporter: Sean Owen >Assignee: Sean Owen >Priority: Major > Labels: release-notes > > See https://issues.apache.org/jira/browse/SPARK-26728 and > https://github.com/apache/spark/pull/23650 . > RDD and DataFrame expose an .unpersist() method with optional "blocking" > argument. So does Broadcast.destroy(). This argument is false by default > except for the Scala RDD (not Pyspark) implementation and its GraphX > subclasses. Most usages of these methods request non-blocking behavior > already, and indeed, it's not typical to want to wait for the resources to be > freed, except in tests asserting behavior about these methods (where blocking > is typically requested). > This proposes to make the default false across these methods, and adjust > callers to only request non-default blocking behavior where important, such > as in a few key tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11215) Add multiple columns support to StringIndexer
[ https://issues.apache.org/jira/browse/SPARK-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-11215: -- Docs Text: When specifying frequencyDesc or frequencyAsc as stringOrderType param in StringIndexer, in case of equal frequency, the order of strings was previously undefined. Since Spark 3.0, strings with equal frequency are further sorted lexicographically. Affects Version/s: 2.4.0 Labels: release-notes (was: ) > Add multiple columns support to StringIndexer > - > > Key: SPARK-11215 > URL: https://issues.apache.org/jira/browse/SPARK-11215 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: Yanbo Liang >Assignee: Yanbo Liang >Priority: Major > Labels: release-notes > > Add multiple columns support to StringIndexer, then users can transform > multiple input columns to multiple output columns simultaneously. See > discussion SPARK-8418. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26763) Using fileStatus cache when filterPartitions
[ https://issues.apache.org/jira/browse/SPARK-26763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26763: --- Assignee: Xianyang Liu > Using fileStatus cache when filterPartitions > > > Key: SPARK-26763 > URL: https://issues.apache.org/jira/browse/SPARK-26763 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xianyang Liu >Assignee: Xianyang Liu >Priority: Major > > We should pass the existed `fileStatusCache` to `InMemoryFileIndex` even > though there aren't partition columns. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24959) Do not invoke the CSV/JSON parser for empty schema
[ https://issues.apache.org/jira/browse/SPARK-24959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755088#comment-16755088 ] Sean Owen commented on SPARK-24959: --- Looks like this may need to be reverted: https://issues.apache.org/jira/browse/SPARK-26745 > Do not invoke the CSV/JSON parser for empty schema > -- > > Key: SPARK-24959 > URL: https://issues.apache.org/jira/browse/SPARK-24959 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 2.4.0 > > > Currently JSON and CSV parsers are called even if required schema is empty. > Invoking the parser per each line has some non-zero overhead. The action can > be skipped. Such optimization should speed up count(), for example. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default
[ https://issues.apache.org/jira/browse/SPARK-26771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26771: Assignee: Apache Spark (was: Sean Owen) > Make .unpersist(), .destroy() consistently non-blocking by default > -- > > Key: SPARK-26771 > URL: https://issues.apache.org/jira/browse/SPARK-26771 > Project: Spark > Issue Type: Improvement > Components: GraphX, Spark Core >Affects Versions: 2.4.0 >Reporter: Sean Owen >Assignee: Apache Spark >Priority: Major > Labels: release-notes > > See https://issues.apache.org/jira/browse/SPARK-26728 and > https://github.com/apache/spark/pull/23650 . > RDD and DataFrame expose an .unpersist() method with optional "blocking" > argument. So does Broadcast.destroy(). This argument is false by default > except for the Scala RDD (not Pyspark) implementation and its GraphX > subclasses. Most usages of these methods request non-blocking behavior > already, and indeed, it's not typical to want to wait for the resources to be > freed, except in tests asserting behavior about these methods (where blocking > is typically requested). > This proposes to make the default false across these methods, and adjust > callers to only request non-default blocking behavior where important, such > as in a few key tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently
[ https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26772: Assignee: Apache Spark > YARNHadoopDelegationTokenManager should load ServiceCredentialProviders > independently > - > > Key: SPARK-26772 > URL: https://issues.apache.org/jira/browse/SPARK-26772 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently
[ https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26772: Assignee: (was: Apache Spark) > YARNHadoopDelegationTokenManager should load ServiceCredentialProviders > independently > - > > Key: SPARK-26772 > URL: https://issues.apache.org/jira/browse/SPARK-26772 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently
[ https://issues.apache.org/jira/browse/SPARK-26772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated SPARK-26772: -- Component/s: (was: Spark Core) YARN > YARNHadoopDelegationTokenManager should load ServiceCredentialProviders > independently > - > > Key: SPARK-26772 > URL: https://issues.apache.org/jira/browse/SPARK-26772 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default
[ https://issues.apache.org/jira/browse/SPARK-26771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26771: Assignee: Sean Owen (was: Apache Spark) > Make .unpersist(), .destroy() consistently non-blocking by default > -- > > Key: SPARK-26771 > URL: https://issues.apache.org/jira/browse/SPARK-26771 > Project: Spark > Issue Type: Improvement > Components: GraphX, Spark Core >Affects Versions: 2.4.0 >Reporter: Sean Owen >Assignee: Sean Owen >Priority: Major > Labels: release-notes > > See https://issues.apache.org/jira/browse/SPARK-26728 and > https://github.com/apache/spark/pull/23650 . > RDD and DataFrame expose an .unpersist() method with optional "blocking" > argument. So does Broadcast.destroy(). This argument is false by default > except for the Scala RDD (not Pyspark) implementation and its GraphX > subclasses. Most usages of these methods request non-blocking behavior > already, and indeed, it's not typical to want to wait for the resources to be > freed, except in tests asserting behavior about these methods (where blocking > is typically requested). > This proposes to make the default false across these methods, and adjust > callers to only request non-default blocking behavior where important, such > as in a few key tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26772) YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently
Gabor Somogyi created SPARK-26772: - Summary: YARNHadoopDelegationTokenManager should load ServiceCredentialProviders independently Key: SPARK-26772 URL: https://issues.apache.org/jira/browse/SPARK-26772 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Gabor Somogyi -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26728) Make rdd.unpersist blocking configurable
[ https://issues.apache.org/jira/browse/SPARK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-26728. --- Resolution: Won't Fix Closing this in favor of https://issues.apache.org/jira/browse/SPARK-26728 > Make rdd.unpersist blocking configurable > > > Key: SPARK-26728 > URL: https://issues.apache.org/jira/browse/SPARK-26728 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.4.0 >Reporter: liupengcheng >Priority: Major > > Currently, rdd.unpersist's blocking argument is set to true by default. > However, in actual production cluster(especially large cluster), node lost or > network issue can always happen. > Users always use rdd.unpersist as non-exceptional, so sometimes the blocking > unpersist may cause user's job failure, and this happened many times in our > cluster. > {code:java} > 2018-05-16,13:28:33,489 WARN org.apache.spark.storage.BlockManagerMaster: > Failed to remove RDD 15 - Failed to send RPC 7571440800577648876 to > c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > java.io.IOException: Failed to send RPC 7571440800577648876 to > c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239) > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908) > at > io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.nio.channels.ClosedChannelException > 2018-05-16,13:28:33,489 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: > User class threw exception: java.io.IOException: Failed to send RPC > 7571440800577648876 to c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > java.io.IOException: Failed to send RPC 7571440800577648876 to > c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239) > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908) > at > io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960) > at >
[jira] [Comment Edited] (SPARK-26728) Make rdd.unpersist blocking configurable
[ https://issues.apache.org/jira/browse/SPARK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755069#comment-16755069 ] Sean Owen edited comment on SPARK-26728 at 1/29/19 2:45 PM: Closing this in favor of https://issues.apache.org/jira/browse/SPARK-26771 was (Author: srowen): Closing this in favor of https://issues.apache.org/jira/browse/SPARK-26728 > Make rdd.unpersist blocking configurable > > > Key: SPARK-26728 > URL: https://issues.apache.org/jira/browse/SPARK-26728 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.4.0 >Reporter: liupengcheng >Priority: Major > > Currently, rdd.unpersist's blocking argument is set to true by default. > However, in actual production cluster(especially large cluster), node lost or > network issue can always happen. > Users always use rdd.unpersist as non-exceptional, so sometimes the blocking > unpersist may cause user's job failure, and this happened many times in our > cluster. > {code:java} > 2018-05-16,13:28:33,489 WARN org.apache.spark.storage.BlockManagerMaster: > Failed to remove RDD 15 - Failed to send RPC 7571440800577648876 to > c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > java.io.IOException: Failed to send RPC 7571440800577648876 to > c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239) > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908) > at > io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.nio.channels.ClosedChannelException > 2018-05-16,13:28:33,489 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: > User class threw exception: java.io.IOException: Failed to send RPC > 7571440800577648876 to c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > java.io.IOException: Failed to send RPC 7571440800577648876 to > c3-hadoop-prc-st2325.bj/10.136.136.25:43474: > java.nio.channels.ClosedChannelException > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239) > at > org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567) > at > io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908) >
[jira] [Created] (SPARK-26771) Make .unpersist(), .destroy() consistently non-blocking by default
Sean Owen created SPARK-26771: - Summary: Make .unpersist(), .destroy() consistently non-blocking by default Key: SPARK-26771 URL: https://issues.apache.org/jira/browse/SPARK-26771 Project: Spark Issue Type: Improvement Components: GraphX, Spark Core Affects Versions: 2.4.0 Reporter: Sean Owen Assignee: Sean Owen See https://issues.apache.org/jira/browse/SPARK-26728 and https://github.com/apache/spark/pull/23650 . RDD and DataFrame expose an .unpersist() method with optional "blocking" argument. So does Broadcast.destroy(). This argument is false by default except for the Scala RDD (not Pyspark) implementation and its GraphX subclasses. Most usages of these methods request non-blocking behavior already, and indeed, it's not typical to want to wait for the resources to be freed, except in tests asserting behavior about these methods (where blocking is typically requested). This proposes to make the default false across these methods, and adjust callers to only request non-default blocking behavior where important, such as in a few key tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26727) CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException
[ https://issues.apache.org/jira/browse/SPARK-26727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755065#comment-16755065 ] Bela Kovacs commented on SPARK-26727: - I could reproduce it with databricks, although took some while: while (true) spark.sql("create or replace view aaa as select * from aa") org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: AlreadyExistsException(message:Table aaa already exists); at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$withClient$1$$anonfun$apply$1.apply(HiveExternalCatalog.scala:150) at org.apache.spark.sql.hive.HiveExternalCatalog.org$apache$spark$sql$hive$HiveExternalCatalog$$maybeSynchronized(HiveExternalCatalog.scala:104) at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$withClient$1.apply(HiveExternalCatalog.scala:139) at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:330) at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:316) at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:137) at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:298) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:99) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:341) at com.databricks.sql.DatabricksSessionCatalog.createTable(DatabricksSessionCatalog.scala:125) at org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:176) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:81) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:205) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:205) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:3423) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:91) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:227) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:86) > CREATE OR REPLACE VIEW query fails with TableAlreadyExistsException > --- > > Key: SPARK-26727 > URL: https://issues.apache.org/jira/browse/SPARK-26727 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Srinivas Yarra >Priority: Major > > We experienced that sometimes the Hive query "CREATE OR REPLACE VIEW name> AS SELECT FROM " fails with the following exception: > {code:java} > // code placeholder > org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: Table or > view '' already exists in database 'default'; at > org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:314) > at > org.apache.spark.sql.execution.command.CreateViewCommand.run(views.scala:165) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) > at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at > org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:195) at > org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365) at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364) at > org.apache.spark.sql.Dataset.(Dataset.scala:195) at > org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:80) at > org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) ... 49 elided > {code} > {code} > scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy > FROM ae_dual") res1: org.apache.spark.sql.DataFrame = [] > scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as SELECT dummy > FROM ae_dual") res2: org.apache.spark.sql.DataFrame = [] > scala> spark.sql("CREATE OR REPLACE VIEW testSparkReplace as
[jira] [Updated] (SPARK-26739) Standardized Join Types for DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Skyler Lehan updated SPARK-26739: - Description: h3. *Q1.* What are you trying to do? Articulate your objectives using absolutely no jargon. Currently, in the join functions on [DataFrames|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset], the join types are defined via a string parameter called joinType. In order for a developer to know which joins are possible, they must look up the API call for join. While this works fine, it can cause the developer to make a typo resulting in improper joins and/or unexpected errors that aren't evident at compile time. The objective of this improvement would be to allow developers to use a common definition for join types (by enum or constants) called JoinTypes. This would contain the possible joins and remove the possibility of a typo. It would also allow Spark to alter the names of the joins in the future without impacting end-users. h3. *Q2.* What problem is this proposal NOT designed to solve? The problem this solves is extremely narrow, it would not solve anything other than providing a common definition for join types. h3. *Q3.* How is it done today, and what are the limits of current practice? Currently, developers must join two DataFrames like so: {code:java} val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), "left_outer") {code} Where they manually type the join type. As stated above, this: * Requires developers to manually type in the join * Can cause possibility of typos * Restricts renaming of join types as its a literal string * Does not restrict and/or compile check the join type being used, leading to runtime errors h3. *Q4.* What is new in your approach and why do you think it will be successful? The new approach would use constants or *more preferably an enum*, something like this: {code:java} val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), JoinType.LEFT_OUTER) {code} This would provide: * In code reference/definitions of the possible join types ** This subsequently allows the addition of scaladoc of what each join type does and how it operates * Removes possibilities of a typo on the join type * Provides compile time checking of the join type (only if an enum is used) To clarify, if JoinType is a constant, it would just fill in the joinType string parameter for users. If an enum is used, it would restrict the domain of possible join types to whatever is defined in the future JoinType enum. The enum is preferred, however it would take longer to implement. h3. *Q5.* Who cares? If you are successful, what difference will it make? Developers using Apache Spark will care. This will make the join function easier to wield and lead to less runtime errors. It will save time by bringing join type validation at compile time. It will also provide in code reference to the join types, which saves the developer time of having to look up and navigate the multiple join functions to find the possible join types. In addition to that, the resulting constants/enum would have documentation on how that join type works. h3. *Q6.* What are the risks? Users of Apache Spark who currently use strings to define their join types could be impacted if an enum is chosen as the common definition. This risk can be mitigated by using string constants. The string constants would be the exact same string as the string literals used today. For example: {code:java} JoinType.INNER = "inner" {code} If an enum is still the preferred way of defining the join types, new join functions could be added that take in these enums and the join calls that contain string parameters for joinType could be deprecated. This would give developers a chance to change over to the new join types. h3. *Q7.* How long will it take? A few days for a seasoned Spark developer. h3. *Q8.* What are the mid-term and final "exams" to check for success? Mid-term exam would be the addition of a common definition of the join types and additional join functions that take in the join type enum/constant. The final exam would be working tests written to check the functionality of these new join functions and the join functions that take a string for joinType would be deprecated. h3. *Appendix A.* Proposed API Changes. Optional section defining APIs changes, if any. Backward and forward compatibility must be taken into account. {color:#FF}*It is heavily recommended that enums, and not string constants are used.*{color} String constants are presented as a possible solution but not the ideal solution. *If enums are used (preferred):* The following join function signatures would be added to the Dataset API: {code:java} def join(right: Dataset[_], joinExprs: Column, joinType: JoinType): DataFrame def join(right: Dataset[_], usingColumns:
[jira] [Commented] (SPARK-26739) Standardized Join Types for DataFrames
[ https://issues.apache.org/jira/browse/SPARK-26739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755056#comment-16755056 ] Skyler Lehan commented on SPARK-26739: -- While constants are possible, they're not ideal. The reason being, if its an enum as opposed to a string constant it can't be compile-time checked to make sure its a valid join. > Standardized Join Types for DataFrames > -- > > Key: SPARK-26739 > URL: https://issues.apache.org/jira/browse/SPARK-26739 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Skyler Lehan >Priority: Minor > Original Estimate: 48h > Remaining Estimate: 48h > > h3. *Q1.* What are you trying to do? Articulate your objectives using > absolutely no jargon. > Currently, in the join functions on > [DataFrames|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset], > the join types are defined via a string parameter called joinType. In order > for a developer to know which joins are possible, they must look up the API > call for join. While this works fine, it can cause the developer to make a > typo resulting in improper joins and/or unexpected errors that aren't evident > at compile time. The objective of this improvement would be to allow > developers to use a common definition for join types (by enum or constants) > called JoinTypes. This would contain the possible joins and remove the > possibility of a typo. It would also allow Spark to alter the names of the > joins in the future without impacting end-users. > h3. *Q2.* What problem is this proposal NOT designed to solve? > The problem this solves is extremely narrow, it would not solve anything > other than providing a common definition for join types. > h3. *Q3.* How is it done today, and what are the limits of current practice? > Currently, developers must join two DataFrames like so: > {code:java} > val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), > "left_outer") > {code} > Where they manually type the join type. As stated above, this: > * Requires developers to manually type in the join > * Can cause possibility of typos > * Restricts renaming of join types as its a literal string > * Does not restrict and/or compile check the join type being used, leading > to runtime errors > h3. *Q4.* What is new in your approach and why do you think it will be > successful? > The new approach would use constants, something like this: > {code:java} > val resultDF = leftDF.join(rightDF, col("ID") === col("RightID"), > JoinType.LEFT_OUTER) > {code} > This would provide: > * In code reference/definitions of the possible join types > ** This subsequently allows the addition of scaladoc of what each join type > does and how it operates > * Removes possibilities of a typo on the join type > * Provides compile time checking of the join type (only if an enum is used) > To clarify, if JoinType is a constant, it would just fill in the joinType > string parameter for users. If an enum is used, it would restrict the domain > of possible join types to whatever is defined in the future JoinType enum. > The enum is preferred, however it would take longer to implement. > h3. *Q5.* Who cares? If you are successful, what difference will it make? > Developers using Apache Spark will care. This will make the join function > easier to wield and lead to less runtime errors. It will save time by > bringing join type validation at compile time. It will also provide in code > reference to the join types, which saves the developer time of having to look > up and navigate the multiple join functions to find the possible join types. > In addition to that, the resulting constants/enum would have documentation on > how that join type works. > h3. *Q6.* What are the risks? > Users of Apache Spark who currently use strings to define their join types > could be impacted if an enum is chosen as the common definition. This risk > can be mitigated by using string constants. The string constants would be the > exact same string as the string literals used today. For example: > {code:java} > JoinType.INNER = "inner" > {code} > If an enum is still the preferred way of defining the join types, new join > functions could be added that take in these enums and the join calls that > contain string parameters for joinType could be deprecated. This would give > developers a chance to change over to the new join types. > h3. *Q7.* How long will it take? > A few days for a seasoned Spark developer. > h3. *Q8.* What are the mid-term and final "exams" to check for success? > Mid-term exam would be the addition of a common definition of the join types > and additional join functions that take in the join type enum/constant. The > final exam
[jira] [Updated] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens
[ https://issues.apache.org/jira/browse/SPARK-26766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated SPARK-26766: -- Priority: Minor (was: Major) > Remove the list of filesystems from > HadoopDelegationTokenProvider.obtainDelegationTokens > > > Key: SPARK-26766 > URL: https://issues.apache.org/jira/browse/SPARK-26766 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Minor > > This was discussed in previous PR > [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54]. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26770) Misleading/unhelpful error message when wrapping a null in an Option
sam created SPARK-26770: --- Summary: Misleading/unhelpful error message when wrapping a null in an Option Key: SPARK-26770 URL: https://issues.apache.org/jira/browse/SPARK-26770 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.2 Reporter: sam This {code} // Using options to indicate nullable fields case class Product(productID: Option[Int], productName: Option[String]) val productExtract: Dataset[Product] = spark.createDataset(Seq( Product( productID = Some(6050286), // user mistake here, should be `None` not `Some(null)` productName = Some(null) ))) productExtract.count() {code} will give an error like the one below. This error is thrown from quite deep down, but there should be some handling logic further up to check for nulls and to give a more informative error message. E.g. it could tell the user which field is null, it could detect the `Some(null)` error and suggest using `None`. Whatever the exception it shouldn't be NPE, since this is clearly a user error, so should be some kind of user error exception. {code} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 276, 10.139.64.8, executor 1): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:194) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:620) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:384) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} I've seen quite a few other people with this error, but I don't think it's for the same reason: https://docs.databricks.com/spark/latest/data-sources/tips/redshift-npe.html https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/Dt6ilC9Dn54 https://issues.apache.org/jira/browse/SPARK-17195 https://issues.apache.org/jira/browse/SPARK-18859 https://github.com/datastax/spark-cassandra-connector/issues/1062 https://stackoverflow.com/questions/39875711/spark-sql-2-0-nullpointerexception-with-a-valid-postgresql-query -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
[ https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Takeshi Yamamuro resolved SPARK-26708. -- Resolution: Fixed Fix Version/s: 3.0.0 2.4.1 Resolved by https://github.com/apache/spark/pull/23644 > Incorrect result caused by inconsistency between a SQL cache's cached RDD and > its physical plan > --- > > Key: SPARK-26708 > URL: https://issues.apache.org/jira/browse/SPARK-26708 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Maryann Xue >Priority: Blocker > Labels: correctness > Fix For: 2.4.1, 3.0.0 > > > When performing non-cascading cache invalidation, {{recache}} is called on > the other cache entries which are dependent on the cache being invalidated. > It leads to the the physical plans of those cache entries being re-compiled. > For those cache entries, if the cache RDD has already been persisted, chances > are there will be inconsistency between the data and the new plan. It can > cause a correctness issue if the new plan's {{outputPartitioning}} or > {{outputOrdering}} is different from the that of the actual data, and > meanwhile the cache is used by another query that asks for specific > {{outputPartitioning}} or {{outputOrdering}} which happens to match the new > plan but not the actual data. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26769) partition prunning in inner join
nhufas created SPARK-26769: -- Summary: partition prunning in inner join Key: SPARK-26769 URL: https://issues.apache.org/jira/browse/SPARK-26769 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: nhufas When joining a partitioned parquet table with another table by partition column it should prunne partitions from partitioned table based on another table values. example: tableA parquet table partitioned be part_filter tableB table with column with partition values tableA is partitioned by part_A,part_B,part_C,part_D tableB is a single column with 2 rows having part_A and part_B as values. doing select * from tableA inner join tableB on tableA.part_filter=tableB.part_filter should generate a partition prunning on tableA based on tableB values (in this case scanning only 2 partitions) but it wll read all 4 partitions from tableA only filter the results. note: this kind of approach works on Hive (filtering tableA partitions) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26765) Avro: Validate input and output schema
[ https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-26765: --- Summary: Avro: Validate input and output schema (was: Implement supportDataType API in Avro data source) > Avro: Validate input and output schema > -- > > Key: SPARK-26765 > URL: https://issues.apache.org/jira/browse/SPARK-26765 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26765) Avro: Validate input and output schema
[ https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-26765: --- Description: The API supportDataType in FileFormat helps to validate the output/input schema before exection starts. So that we can avoid some invalid data source IO, and users can see clean error messages. This PR is to override the validation API in Avro data source. Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), NullType is supported. This PR fixes the handling of NullType. > Avro: Validate input and output schema > -- > > Key: SPARK-26765 > URL: https://issues.apache.org/jira/browse/SPARK-26765 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Minor > > The API supportDataType in FileFormat helps to validate the output/input > schema before exection starts. So that we can avoid some invalid data source > IO, and users can see clean error messages. > This PR is to override the validation API in Avro data source. > Also, as per the spec of Avro(https://avro.apache.org/docs/1.8.2/spec.html), > NullType is supported. This PR fixes the handling of NullType. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liupengcheng updated SPARK-26768: - Description: Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below: {code:java} override def getBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) } else { getLocalBytes(blockId) match { case Some(blockData) => new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true) case None => // If this block manager receives a request for a block that it doesn't have then it's // likely that the master has outdated block statuses for this block. Therefore, we send // an RPC so that this block is marked as being unavailable from this block manager. reportBlockStatus(blockId, BlockStatus.empty) throw new BlockNotFoundException(blockId.toString) } } } {code} {code:java} def getLocalBytes(blockId: BlockId): Option[BlockData] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. val buf = new ChunkedByteBuffer( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) Some(new ByteBufferBlockData(buf, true)) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } } {code} the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`. !Selection_037.jpg! So I think we should remove these useless code for easy reading. was: Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below: {code:java} override def getBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) } else { getLocalBytes(blockId) match { case Some(blockData) => new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true) case None => // If this block manager receives a request for a block that it doesn't have then it's // likely that the master has outdated block statuses for this block. Therefore, we send // an RPC so that this block is marked as being unavailable from this block manager. reportBlockStatus(blockId, BlockStatus.empty) throw new BlockNotFoundException(blockId.toString) } } } {code} {code:java} def getLocalBytes(blockId: BlockId): Option[BlockData] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. val buf = new ChunkedByteBuffer( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) Some(new ByteBufferBlockData(buf, true)) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } } {code} the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`. So I think we should remove these useless code for easy reading. > Remove useless code in BlockManager > --- > > Key: SPARK-26768 > URL: https://issues.apache.org/jira/browse/SPARK-26768 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: liupengcheng >Priority: Major > Attachments: Selection_037.jpg > > > Recently, when I was
[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager
[ https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liupengcheng updated SPARK-26768: - Attachment: Selection_037.jpg > Remove useless code in BlockManager > --- > > Key: SPARK-26768 > URL: https://issues.apache.org/jira/browse/SPARK-26768 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: liupengcheng >Priority: Major > Attachments: Selection_037.jpg > > > Recently, when I was reading some code of `BlockManager.getBlockData`, I > found that there are useless code that would never reach. The related codes > is as below: > > {code:java} > override def getBlockData(blockId: BlockId): ManagedBuffer = { > if (blockId.isShuffle) { > > shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) > } else { > getLocalBytes(blockId) match { > case Some(blockData) => > new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, > true) > case None => > // If this block manager receives a request for a block that it > doesn't have then it's > // likely that the master has outdated block statuses for this block. > Therefore, we send > // an RPC so that this block is marked as being unavailable from this > block manager. > reportBlockStatus(blockId, BlockStatus.empty) > throw new BlockNotFoundException(blockId.toString) > } > } > } > {code} > {code:java} > def getLocalBytes(blockId: BlockId): Option[BlockData] = { > logDebug(s"Getting local block $blockId as bytes") > // As an optimization for map output fetches, if the block is for a > shuffle, return it > // without acquiring a lock; the disk store never deletes (recent) items so > this should work > if (blockId.isShuffle) { > val shuffleBlockResolver = shuffleManager.shuffleBlockResolver > // TODO: This should gracefully handle case where local block is not > available. Currently > // downstream code will throw an exception. > val buf = new ChunkedByteBuffer( > > shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) > Some(new ByteBufferBlockData(buf, true)) > } else { > blockInfoManager.lockForReading(blockId).map { info => > doGetLocalBytes(blockId, info) } > } > } > {code} > the `blockId.isShuffle` is checked twice, but however it seems that in the > method calling hierarchy of `BlockManager.getLocalBytes`, the another > callsite of the `BlockManager.getLocalBytes` is at > `TorrentBroadcast.readBlocks` where the blockId can never be a > `ShuffleBlockId`. > > So I think we should remove these useless code for easy reading. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26768) Remove useless code in BlockManager
liupengcheng created SPARK-26768: Summary: Remove useless code in BlockManager Key: SPARK-26768 URL: https://issues.apache.org/jira/browse/SPARK-26768 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: liupengcheng Recently, when I was reading some code of `BlockManager.getBlockData`, I found that there are useless code that would never reach. The related codes is as below: {code:java} override def getBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) } else { getLocalBytes(blockId) match { case Some(blockData) => new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true) case None => // If this block manager receives a request for a block that it doesn't have then it's // likely that the master has outdated block statuses for this block. Therefore, we send // an RPC so that this block is marked as being unavailable from this block manager. reportBlockStatus(blockId, BlockStatus.empty) throw new BlockNotFoundException(blockId.toString) } } } {code} {code:java} def getLocalBytes(blockId: BlockId): Option[BlockData] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. val buf = new ChunkedByteBuffer( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) Some(new ByteBufferBlockData(buf, true)) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } } {code} the `blockId.isShuffle` is checked twice, but however it seems that in the method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where the blockId can never be a `ShuffleBlockId`. So I think we should remove these useless code for easy reading. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754882#comment-16754882 ] Marco Gaido edited comment on SPARK-26767 at 1/29/19 11:13 AM: --- IIRC there was a similar JIRA reported. Maybe the problem is the same. The JIRA is SPARK-25420: please check the comments there. Your case may be the same. was (Author: mgaido): IIRC there was a similar JIRA reported. May you please try in a newer version (ideally current branch-2.3)? This may have been fixed. > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Major > > To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754882#comment-16754882 ] Marco Gaido commented on SPARK-26767: - IIRC there was a similar JIRA reported. May you please try in a newer version (ideally current branch-2.3)? This may have been fixed. > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Major > > To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-26767: - Component/s: (was: Build) SQL > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Major > > To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-26767: - Priority: Major (was: Blocker) > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Major > > To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754880#comment-16754880 ] Hyukjin Kwon commented on SPARK-26767: -- Please avoid to set Critical+ which is usually reserved for committers. > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Major > > To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens
[ https://issues.apache.org/jira/browse/SPARK-26766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754865#comment-16754865 ] Gabor Somogyi commented on SPARK-26766: --- [~vanzin] I was thinking about your [suggestion|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54] and here are my findings. * YarnSparkHadoopUtil.hadoopFSsToAccess covers everything which needed as you suggested * On the other hand it's placed in YARN which makes it inaccessible from core * Don't think it's good idea to move either the mentioned function or the token provider I see mainly the following ways: * Token provider asks the manager for file systems: I would prefer this * Use some sort of init function where parameters can be passed: Not all the providers interested in this param (actually only FS) * Split the token provider to YARN... Mesos... and implement the filesystem providing functions there: I think it's overkill Unless you have better idea I'll go with the first one. > Remove the list of filesystems from > HadoopDelegationTokenProvider.obtainDelegationTokens > > > Key: SPARK-26766 > URL: https://issues.apache.org/jira/browse/SPARK-26766 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > > This was discussed in previous PR > [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54]. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeffrey updated SPARK-26767: Description: o repeat the problem, (1) create a csv file with records holding same values for a subset of columns (e.g. colA, colB, colC). (2) read the csv file as a spark dataframe and then use dropDuplicates to dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 'A' and colB='B' and colG='G' and colH='H').show(100,False)) => When (3) is rerun, it gives different number of resulting rows. was:Fe > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Blocker > > o repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
[ https://issues.apache.org/jira/browse/SPARK-26767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeffrey updated SPARK-26767: Description: To repeat the problem, (1) create a csv file with records holding same values for a subset of columns (e.g. colA, colB, colC). (2) read the csv file as a spark dataframe and then use dropDuplicates to dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 'A' and colB='B' and colG='G' and colH='H').show(100,False)) => When (3) is rerun, it gives different number of resulting rows. was: o repeat the problem, (1) create a csv file with records holding same values for a subset of columns (e.g. colA, colB, colC). (2) read the csv file as a spark dataframe and then use dropDuplicates to dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 'A' and colB='B' and colG='G' and colH='H').show(100,False)) => When (3) is rerun, it gives different number of resulting rows. > Filter on a dropDuplicates dataframe gives inconsistency result > --- > > Key: SPARK-26767 > URL: https://issues.apache.org/jira/browse/SPARK-26767 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.3.0 > Environment: To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. >Reporter: Jeffrey >Priority: Blocker > > To repeat the problem, > (1) create a csv file with records holding same values for a subset of > columns (e.g. colA, colB, colC). > (2) read the csv file as a spark dataframe and then use dropDuplicates to > dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) > (3) select the resulting dataframe with where clause. (i.e. df.where("colA = > 'A' and colB='B' and colG='G' and colH='H').show(100,False)) > > => When (3) is rerun, it gives different number of resulting rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26767) Filter on a dropDuplicates dataframe gives inconsistency result
Jeffrey created SPARK-26767: --- Summary: Filter on a dropDuplicates dataframe gives inconsistency result Key: SPARK-26767 URL: https://issues.apache.org/jira/browse/SPARK-26767 Project: Spark Issue Type: Bug Components: Build Affects Versions: 2.3.0 Environment: To repeat the problem, (1) create a csv file with records holding same values for a subset of columns (e.g. colA, colB, colC). (2) read the csv file as a spark dataframe and then use dropDuplicates to dedup the subset of columns (i.e. dropDuplicates(["colA", "colB", "colC"])) (3) select the resulting dataframe with where clause. (i.e. df.where("colA = 'A' and colB='B' and colG='G' and colH='H').show(100,False)) => When (3) is rerun, it gives different number of resulting rows. Reporter: Jeffrey Fe -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26766) Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens
Gabor Somogyi created SPARK-26766: - Summary: Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens Key: SPARK-26766 URL: https://issues.apache.org/jira/browse/SPARK-26766 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Gabor Somogyi This was discussed in previous PR [here|https://github.com/apache/spark/pull/23499/files#diff-406f99efa37915001b613de47815e25cR54]. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26765) Implement supportDataType API in Avro data source
Gengliang Wang created SPARK-26765: -- Summary: Implement supportDataType API in Avro data source Key: SPARK-26765 URL: https://issues.apache.org/jira/browse/SPARK-26765 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.0.0 Reporter: Gengliang Wang -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26765) Implement supportDataType API in Avro data source
[ https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26765: Assignee: (was: Apache Spark) > Implement supportDataType API in Avro data source > - > > Key: SPARK-26765 > URL: https://issues.apache.org/jira/browse/SPARK-26765 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26765) Implement supportDataType API in Avro data source
[ https://issues.apache.org/jira/browse/SPARK-26765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26765: Assignee: Apache Spark > Implement supportDataType API in Avro data source > - > > Key: SPARK-26765 > URL: https://issues.apache.org/jira/browse/SPARK-26765 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Apache Spark >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26764) [SPIP] Spark Relational Cache
Adrian Wang created SPARK-26764: --- Summary: [SPIP] Spark Relational Cache Key: SPARK-26764 URL: https://issues.apache.org/jira/browse/SPARK-26764 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Adrian Wang Attachments: Relational+Cache+SPIP.pdf In modern database systems, relational cache is a common technology to boost ad-hoc queries. While Spark provides cache natively, Spark SQL should be able to utilize the relationship between relations to boost all possible queries. In this SPIP, we will make Spark be able to utilize all defined cached relations if possible, without explicit substitution in user query, as well as keep some user defined cache available in different sessions. Materialized views in many database systems provide similar function. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26764) [SPIP] Spark Relational Cache
[ https://issues.apache.org/jira/browse/SPARK-26764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adrian Wang updated SPARK-26764: Attachment: Relational+Cache+SPIP.pdf > [SPIP] Spark Relational Cache > - > > Key: SPARK-26764 > URL: https://issues.apache.org/jira/browse/SPARK-26764 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: Adrian Wang >Priority: Major > Attachments: Relational+Cache+SPIP.pdf > > > In modern database systems, relational cache is a common technology to boost > ad-hoc queries. While Spark provides cache natively, Spark SQL should be able > to utilize the relationship between relations to boost all possible queries. > In this SPIP, we will make Spark be able to utilize all defined cached > relations if possible, without explicit substitution in user query, as well > as keep some user defined cache available in different sessions. Materialized > views in many database systems provide similar function. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26752) Multiple aggregate methods in the same column in DataFrame
[ https://issues.apache.org/jira/browse/SPARK-26752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26752. -- Resolution: Won't Fix > Multiple aggregate methods in the same column in DataFrame > -- > > Key: SPARK-26752 > URL: https://issues.apache.org/jira/browse/SPARK-26752 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Guilherme Beltramini >Priority: Minor > > The agg function in > [org.apache.spark.sql.RelationalGroupedDataset|https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset] > accepts as input: > * Column* > * Map[String, String] > * (String, String)* > I'm proposing to add Map[String, Seq[String]], where the keys are the columns > to aggregate, and the values are the functions to apply the aggregation. Here > is a similar question: > http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-multiple-agg-on-the-same-column-td29541.html. > In the example below (running in spark-shell, with Spark 2.4.0), I'm showing > a workaround. What I'm proposing is that agg should accept aggMap as input: > {code:java} > scala> val df = Seq(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 10), ("b", > 20), ("c", 100)).toDF("col1", "col2") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: int] > scala> df.show > +++ > |col1|col2| > +++ > | a| 1| > | a| 2| > | a| 3| > | a| 4| > | b| 10| > | b| 20| > | c| 100| > +++ > scala> val aggMap = Map("col1" -> Seq("count"), "col2" -> Seq("min", "max", > "mean")) > aggMap: scala.collection.immutable.Map[String,Seq[String]] = Map(col1 -> > List(count), col2 -> List(min, max, mean)) > scala> val aggSeq = aggMap.toSeq.flatMap{ case (c: String, fns: Seq[String]) > => Seq(c).zipAll(fns, c, "") } > aggSeq: Seq[(String, String)] = ArrayBuffer((col1,count), (col2,min), > (col2,max), (col2,mean)) > scala> val dfAgg = df.groupBy("col1").agg(aggSeq.head, aggSeq.tail: _*) > dfAgg: org.apache.spark.sql.DataFrame = [col1: string, count(col1): bigint > ... 3 more fields] > scala> dfAgg.orderBy("col1").show > ++---+-+-+-+ > |col1|count(col1)|min(col2)|max(col2)|avg(col2)| > ++---+-+-+-+ > | a| 4|1|4| 2.5| > | b| 2| 10| 20| 15.0| > | c| 1| 100| 100|100.0| > ++---+-+-+-+ > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26760) [Spark Incorrect display in SPARK UI Executor Tab when number of cores is 4 and Active Task display as 5 in Executor Tab of SPARK UI]
[ https://issues.apache.org/jira/browse/SPARK-26760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ABHISHEK KUMAR GUPTA updated SPARK-26760: - Summary: [Spark Incorrect display in SPARK UI Executor Tab when number of cores is 4 and Active Task display as 5 in Executor Tab of SPARK UI] (was: [Spark Incorrect display in YARN UI Executor Tab when number of cores is 4 and Active Task display as 5 in Executor Tab of YARN UI]) > [Spark Incorrect display in SPARK UI Executor Tab when number of cores is 4 > and Active Task display as 5 in Executor Tab of SPARK UI] > - > > Key: SPARK-26760 > URL: https://issues.apache.org/jira/browse/SPARK-26760 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 > Environment: Spark 2.4 >Reporter: ABHISHEK KUMAR GUPTA >Priority: Major > Attachments: SPARK-26760.png > > > Steps: > # Launch Spark Shell > # bin/spark-shell --master yarn --conf spark.dynamicAllocation.enabled=true > --conf spark.dynamicAllocation.initialExecutors=3 --conf > spark.dynamicAllocation.minExecutors=1 --conf > spark.dynamicAllocation.executorIdleTimeout=60s --conf > spark.dynamicAllocation.maxExecutors=5 > # Submit a Job sc.parallelize(1 to 1,116000).count() > # Check the YARN UI Executor Tab for the RUNNING application > # UI display as Number of cores 4 and Active Tasks column shows as 5 > Expected: > It Number of Active Tasks should be same as Number of Cores. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org