[jira] [Commented] (SPARK-26390) ColumnPruning rule should only do column pruning
[ https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723773#comment-16723773 ] ASF GitHub Bot commented on SPARK-26390: cloud-fan opened a new pull request #23343: [SPARK-26390][SQL] ColumnPruning rule should only do column pruning URL: https://github.com/apache/spark/pull/23343 ## What changes were proposed in this pull request? This is a small clean up. By design catalyst rules should be orthogonal: each rule should have its own responsibility. However, the `ColumnPruning` rule does not only do column pruning, but also remove no-op project and window. This PR updates the `RemoveRedundantProject` rule to remove no-op window as well, and clean up the `ColumnPruning` rule to only do column pruning. ## How was this patch tested? existing tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ColumnPruning rule should only do column pruning > > > Key: SPARK-26390 > URL: https://issues.apache.org/jira/browse/SPARK-26390 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26390) ColumnPruning rule should only do column pruning
[ https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26390: Assignee: Wenchen Fan (was: Apache Spark) > ColumnPruning rule should only do column pruning > > > Key: SPARK-26390 > URL: https://issues.apache.org/jira/browse/SPARK-26390 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26390) ColumnPruning rule should only do column pruning
[ https://issues.apache.org/jira/browse/SPARK-26390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26390: Assignee: Apache Spark (was: Wenchen Fan) > ColumnPruning rule should only do column pruning > > > Key: SPARK-26390 > URL: https://issues.apache.org/jira/browse/SPARK-26390 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26390) ColumnPruning rule should only do column pruning
Wenchen Fan created SPARK-26390: --- Summary: ColumnPruning rule should only do column pruning Key: SPARK-26390 URL: https://issues.apache.org/jira/browse/SPARK-26390 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26324) Spark submit does not work with messos over ssl [Missing docs]
[ https://issues.apache.org/jira/browse/SPARK-26324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723761#comment-16723761 ] Jorge Machado commented on SPARK-26324: --- [~hyukjin.kwon] I created a PR for this docs > Spark submit does not work with messos over ssl [Missing docs] > -- > > Key: SPARK-26324 > URL: https://issues.apache.org/jira/browse/SPARK-26324 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Priority: Major > > Hi guys, > I was trying to run the examples on a mesos cluster that uses https. I tried > with rest endpoint: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --conf spark.master.rest.enabled=true > --deploy-mode cluster --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > The error that I get on the host where I started the spark-submit is: > {code:java} > 2018-12-10 15:08:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 2018-12-10 15:08:39 INFO RestSubmissionClient:54 - Submitting a request to > launch an application in mesos://:5050. > 2018-12-10 15:08:39 WARN RestSubmissionClient:66 - Unable to connect to > server mesos://:5050. > Exception in thread "main" > org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable to connect > to server > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:104) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:86) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at > org.apache.spark.deploy.rest.RestSubmissionClient.createSubmission(RestSubmissionClient.scala:86) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.run(RestSubmissionClient.scala:443) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.start(RestSubmissionClient.scala:455) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable > to connect to server > at > org.apache.spark.deploy.rest.RestSubmissionClient.readResponse(RestSubmissionClient.scala:281) > at > org.apache.spark.deploy.rest.RestSubmissionClient.org$apache$spark$deploy$rest$RestSubmissionClient$$postJson(RestSubmissionClient.scala:225) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:90) > ... 15 more > Caused by: java.net.SocketException: Connection reset > {code} > I'm pretty sure this is because of the hardcoded http:// here: > > > {code:java} > RestSubmissionClient.scala > /** Return the base URL for communicating with the server, including the > protocol version. */ > private def getBaseUrl(master: String): String = { > var masterUrl = master > supportedMasterPrefixes.foreach { prefix => > if (master.startsWith(prefix)) { > masterUrl = master.stripPrefix(prefix) > } > } > masterUrl = masterUrl.stripSuffix("/") > s"http://$masterUrl/$PROTOCOL_VERSION/submissions; <--- hardcoded http > } > {code} > Then I tried without the _--deploy-mode cluster_ and I get: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > On the spark console I get: > {code:java} > 2018-12-10 15:01:05 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started > at http://_host:4040 > 2018-12-10 15:01:05 INFO SparkContext:54 - Added JAR > file:/home//spark-2.4.0-bin-hadoop2.7/bin/../examples/jars/spark-examples_2.11-2.4.0.jar > at spark://_host:35719/jars/spark-examples_2.11-2.4.0.jar with timestamp > 1544450465799 > I1210 15:01:05.963078 37943
[jira] [Commented] (SPARK-26324) Spark submit does not work with messos over ssl [Missing docs]
[ https://issues.apache.org/jira/browse/SPARK-26324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723754#comment-16723754 ] ASF GitHub Bot commented on SPARK-26324: jomach opened a new pull request #23342: [SPARK-26324][DOCS] Add Spark docs for Running in Mesos with SSL URL: https://github.com/apache/spark/pull/23342 ## What changes were proposed in this pull request? Added docs for running spark jobs with Mesos on SSL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Spark submit does not work with messos over ssl [Missing docs] > -- > > Key: SPARK-26324 > URL: https://issues.apache.org/jira/browse/SPARK-26324 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Priority: Major > > Hi guys, > I was trying to run the examples on a mesos cluster that uses https. I tried > with rest endpoint: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --conf spark.master.rest.enabled=true > --deploy-mode cluster --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > The error that I get on the host where I started the spark-submit is: > {code:java} > 2018-12-10 15:08:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 2018-12-10 15:08:39 INFO RestSubmissionClient:54 - Submitting a request to > launch an application in mesos://:5050. > 2018-12-10 15:08:39 WARN RestSubmissionClient:66 - Unable to connect to > server mesos://:5050. > Exception in thread "main" > org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable to connect > to server > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:104) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:86) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at > org.apache.spark.deploy.rest.RestSubmissionClient.createSubmission(RestSubmissionClient.scala:86) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.run(RestSubmissionClient.scala:443) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.start(RestSubmissionClient.scala:455) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable > to connect to server > at > org.apache.spark.deploy.rest.RestSubmissionClient.readResponse(RestSubmissionClient.scala:281) > at > org.apache.spark.deploy.rest.RestSubmissionClient.org$apache$spark$deploy$rest$RestSubmissionClient$$postJson(RestSubmissionClient.scala:225) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:90) > ... 15 more > Caused by: java.net.SocketException: Connection reset > {code} > I'm pretty sure this is because of the hardcoded http:// here: > > > {code:java} > RestSubmissionClient.scala > /** Return the base URL for communicating with the server, including the > protocol version. */ > private def getBaseUrl(master: String): String = { > var masterUrl = master > supportedMasterPrefixes.foreach { prefix => > if (master.startsWith(prefix)) { > masterUrl = master.stripPrefix(prefix) > } > } > masterUrl = masterUrl.stripSuffix("/") > s"http://$masterUrl/$PROTOCOL_VERSION/submissions; <--- hardcoded http > } > {code} > Then I tried without the _--deploy-mode cluster_ and I get: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050
[jira] [Assigned] (SPARK-26324) Spark submit does not work with messos over ssl [Missing docs]
[ https://issues.apache.org/jira/browse/SPARK-26324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26324: Assignee: Apache Spark > Spark submit does not work with messos over ssl [Missing docs] > -- > > Key: SPARK-26324 > URL: https://issues.apache.org/jira/browse/SPARK-26324 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Assignee: Apache Spark >Priority: Major > > Hi guys, > I was trying to run the examples on a mesos cluster that uses https. I tried > with rest endpoint: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --conf spark.master.rest.enabled=true > --deploy-mode cluster --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > The error that I get on the host where I started the spark-submit is: > {code:java} > 2018-12-10 15:08:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 2018-12-10 15:08:39 INFO RestSubmissionClient:54 - Submitting a request to > launch an application in mesos://:5050. > 2018-12-10 15:08:39 WARN RestSubmissionClient:66 - Unable to connect to > server mesos://:5050. > Exception in thread "main" > org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable to connect > to server > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:104) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:86) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at > org.apache.spark.deploy.rest.RestSubmissionClient.createSubmission(RestSubmissionClient.scala:86) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.run(RestSubmissionClient.scala:443) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.start(RestSubmissionClient.scala:455) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable > to connect to server > at > org.apache.spark.deploy.rest.RestSubmissionClient.readResponse(RestSubmissionClient.scala:281) > at > org.apache.spark.deploy.rest.RestSubmissionClient.org$apache$spark$deploy$rest$RestSubmissionClient$$postJson(RestSubmissionClient.scala:225) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:90) > ... 15 more > Caused by: java.net.SocketException: Connection reset > {code} > I'm pretty sure this is because of the hardcoded http:// here: > > > {code:java} > RestSubmissionClient.scala > /** Return the base URL for communicating with the server, including the > protocol version. */ > private def getBaseUrl(master: String): String = { > var masterUrl = master > supportedMasterPrefixes.foreach { prefix => > if (master.startsWith(prefix)) { > masterUrl = master.stripPrefix(prefix) > } > } > masterUrl = masterUrl.stripSuffix("/") > s"http://$masterUrl/$PROTOCOL_VERSION/submissions; <--- hardcoded http > } > {code} > Then I tried without the _--deploy-mode cluster_ and I get: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > On the spark console I get: > {code:java} > 2018-12-10 15:01:05 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started > at http://_host:4040 > 2018-12-10 15:01:05 INFO SparkContext:54 - Added JAR > file:/home//spark-2.4.0-bin-hadoop2.7/bin/../examples/jars/spark-examples_2.11-2.4.0.jar > at spark://_host:35719/jars/spark-examples_2.11-2.4.0.jar with timestamp > 1544450465799 > I1210 15:01:05.963078 37943 sched.cpp:232] Version:
[jira] [Assigned] (SPARK-26324) Spark submit does not work with messos over ssl [Missing docs]
[ https://issues.apache.org/jira/browse/SPARK-26324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26324: Assignee: (was: Apache Spark) > Spark submit does not work with messos over ssl [Missing docs] > -- > > Key: SPARK-26324 > URL: https://issues.apache.org/jira/browse/SPARK-26324 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 2.4.0 >Reporter: Jorge Machado >Priority: Major > > Hi guys, > I was trying to run the examples on a mesos cluster that uses https. I tried > with rest endpoint: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --conf spark.master.rest.enabled=true > --deploy-mode cluster --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > The error that I get on the host where I started the spark-submit is: > {code:java} > 2018-12-10 15:08:39 WARN NativeCodeLoader:62 - Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 2018-12-10 15:08:39 INFO RestSubmissionClient:54 - Submitting a request to > launch an application in mesos://:5050. > 2018-12-10 15:08:39 WARN RestSubmissionClient:66 - Unable to connect to > server mesos://:5050. > Exception in thread "main" > org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable to connect > to server > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:104) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:86) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at > org.apache.spark.deploy.rest.RestSubmissionClient.createSubmission(RestSubmissionClient.scala:86) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.run(RestSubmissionClient.scala:443) > at > org.apache.spark.deploy.rest.RestSubmissionClientApp.start(RestSubmissionClient.scala:455) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: org.apache.spark.deploy.rest.SubmitRestConnectionException: Unable > to connect to server > at > org.apache.spark.deploy.rest.RestSubmissionClient.readResponse(RestSubmissionClient.scala:281) > at > org.apache.spark.deploy.rest.RestSubmissionClient.org$apache$spark$deploy$rest$RestSubmissionClient$$postJson(RestSubmissionClient.scala:225) > at > org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:90) > ... 15 more > Caused by: java.net.SocketException: Connection reset > {code} > I'm pretty sure this is because of the hardcoded http:// here: > > > {code:java} > RestSubmissionClient.scala > /** Return the base URL for communicating with the server, including the > protocol version. */ > private def getBaseUrl(master: String): String = { > var masterUrl = master > supportedMasterPrefixes.foreach { prefix => > if (master.startsWith(prefix)) { > masterUrl = master.stripPrefix(prefix) > } > } > masterUrl = masterUrl.stripSuffix("/") > s"http://$masterUrl/$PROTOCOL_VERSION/submissions; <--- hardcoded http > } > {code} > Then I tried without the _--deploy-mode cluster_ and I get: > {code:java} > ./spark-submit --class org.apache.spark.examples.SparkPi --master > mesos://:5050 --supervise --executor-memory 10G > --total-executor-cores 100 ../examples/jars/spark-examples_2.11-2.4.0.jar 1000 > {code} > On the spark console I get: > {code:java} > 2018-12-10 15:01:05 INFO SparkUI:54 - Bound SparkUI to 0.0.0.0, and started > at http://_host:4040 > 2018-12-10 15:01:05 INFO SparkContext:54 - Added JAR > file:/home//spark-2.4.0-bin-hadoop2.7/bin/../examples/jars/spark-examples_2.11-2.4.0.jar > at spark://_host:35719/jars/spark-examples_2.11-2.4.0.jar with timestamp > 1544450465799 > I1210 15:01:05.963078 37943 sched.cpp:232] Version: 1.3.2 > I1210
[jira] [Comment Edited] (SPARK-26223) Scan: track metastore operation time
[ https://issues.apache.org/jira/browse/SPARK-26223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723111#comment-16723111 ] Yuanjian Li edited comment on SPARK-26223 at 12/18/18 6:34 AM: --- The usage of externalCatalog in `SessionCatalog` and the interface of `ExternalCatalog` are clear clues for this issue. Most interfaces in ExternalCatalog used in DDL, listing all scenario for metastore operations relative of Scan below: # getTable: called by analyzing rule ResolveRelation's lookupRelation. # listPartitions: 1. Called by execution stage about HiveTableScanExec during getting raw Partitions. 2. Called by optimize rule OptimizeMetadataOnlyQuery's replaceTableScanWithPartitionMetadata. 3. Called by HiveMetastoreCtalog.convertToLogicalRelation when lazy pruning is disabled, the entrance of this scenario is the analysis rule RelationConversions of hive analyzer. # listPartitionsByFilter: 1. Same with 2.1 2. Same with 2.2 3. Called by CatalogFileIndex, currently, we address this meta store operation time by adding in file listing([discussion link|https://github.com/apache/spark/pull/23327#discussion_r242076144]), will split in this PR. We can address all this scenario by appending phase to a new-added array buffer in the `CatalogTable` and dump the records to metrics in the scan node. was (Author: xuanyuan): The usage of externalCatalog in `SessionCatalog` and the interface of `ExternalCatalog` are clear clues for this issue. Most interfaces in ExternalCatalog used in DDL, listing all scenario for metastore operations relative of Scan below: # getTable: called by analyzing rule ResolveRelation's lookupRelation. # listPartitions: 1. Called by execution stage about HiveTableScanExec during getting raw Partitions. 2. Called by optimize rule OptimizeMetadataOnlyQuery's replaceTableScanWithPartitionMetadata. 3. Called by HiveMetastoreCtalog.convertToLogicalRelation when lazy pruning is disabled, the entrance of this scenario is the analysis rule RelationConversions of hive analyzer. # listPartitionsByFilter: 1. Same with 2.1 2. Same with 2.2 3. Called by CatalogFileIndex, currently, we address this meta store operation time by adding in file listing([discussion link|https://github.com/apache/spark/pull/23327#discussion_r242076144]), will split in this PR. We can address all this scenario by appending phase to a new-added array buffer in the `CatalogTable` parameter list and dump the records to metrics in the scan node. > Scan: track metastore operation time > > > Key: SPARK-26223 > URL: https://issues.apache.org/jira/browse/SPARK-26223 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > The Scan node should report how much time it spent in metastore operations. > Similar to file listing, would be great to also report start and end time for > constructing a timeline. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723717#comment-16723717 ] ASF GitHub Bot commented on SPARK-26246: asfgit closed pull request #23201: [SPARK-26246][SQL] Inferring TimestampType from JSON URL: https://github.com/apache/spark/pull/23201 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 263e05de32075..d1bc00c08c1c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil -import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -37,6 +37,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val decimalParser = ExprUtils.getDecimalParser(options.locale) + @transient + private lazy val timestampFormatter = TimestampFormatter( +options.timestampFormat, +options.timeZone, +options.locale) + /** * Infer the type of a collection of json records in three stages: * 1. Infer the type of each record @@ -115,13 +121,19 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { // record fields' types have been combined. NullType - case VALUE_STRING if options.prefersDecimal => + case VALUE_STRING => +val field = parser.getText val decimalTry = allCatch opt { - val bigDecimal = decimalParser(parser.getText) + val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } -decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType +if (options.prefersDecimal && decimalTry.isDefined) { + decimalTry.get +} else if ((allCatch opt timestampFormatter.parse(field)).isDefined) { + TimestampType +} else { + StringType +} case START_OBJECT => val builder = Array.newBuilder[StructField] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala new file mode 100644 index 0..9307f9b47b807 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import com.fasterxml.jackson.core.JsonFactory + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { + + def checkType(options: Map[String, String], json: String, dt: DataType): Unit = { +val jsonOptions = new JSONOptions(options, "UTC", "") +val inferSchema = new JsonInferSchema(jsonOptions) +val factory = new JsonFactory() +jsonOptions.setJacksonOptions(factory) +val parser = CreateJacksonParser.string(factory, json) +parser.nextToken() +val expectedType = StructType(Seq(StructField("a", dt, true))) + +assert(inferSchema.inferField(parser) === expectedType) + } +
[jira] [Resolved] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26246. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23201 [https://github.com/apache/spark/pull/23201] > Infer timestamp types from JSON > --- > > Key: SPARK-26246 > URL: https://issues.apache.org/jira/browse/SPARK-26246 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 3.0.0 > > > Currently, TimestampType cannot be inferred from JSON. To parse JSON string, > you have to specify schema explicitly if JSON input contains timestamps. This > ticket aims to extend JsonInferSchema to support such inferring. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26246: Assignee: Maxim Gekk > Infer timestamp types from JSON > --- > > Key: SPARK-26246 > URL: https://issues.apache.org/jira/browse/SPARK-26246 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 3.0.0 > > > Currently, TimestampType cannot be inferred from JSON. To parse JSON string, > you have to specify schema explicitly if JSON input contains timestamps. This > ticket aims to extend JsonInferSchema to support such inferring. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26081) Do not write empty files by text datasources
[ https://issues.apache.org/jira/browse/SPARK-26081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723679#comment-16723679 ] ASF GitHub Bot commented on SPARK-26081: HyukjinKwon opened a new pull request #23341: [SPARK-26081][SQL] Use foreach instead of misuse of map (for Unit) URL: https://github.com/apache/spark/pull/23341 ## What changes were proposed in this pull request? This PR proposes to use foreach instead of misuse of map (for Unit). This could cause some weird errors potentially and it's not a good practice anyway. See also SPARK-16694 ## How was this patch tested? N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Do not write empty files by text datasources > > > Key: SPARK-26081 > URL: https://issues.apache.org/jira/browse/SPARK-26081 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Labels: release-notes > Fix For: 3.0.0 > > > Text based datasources like CSV, JSON and Text produces empty files for empty > partitions. This introduces additional overhead while opening and reading > such files back. In current implementation of OutputWriter, the output stream > are created eagerly even no records are written to the stream. So, creation > can be postponed up to the first write. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fengyu Cao updated SPARK-26389: --- Description: {{spark-submit --master mesos:// -conf spark.streaming.stopGracefullyOnShutdown=true }} CTRL-C, framework shutdown {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error org.apache.spark.SparkException: Writing job aborted.}} {{/tmp/temporary- on executor not deleted due to org.apache.spark.SparkException: Writing job aborted., and this temp checkpoint can't used to recovery.}} was: {{spark-submit --master mesos:// -conf spark.streaming.stopGracefullyOnShutdown=true }} CTRL-C, framework shutdown {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error}} {{ org.apache.spark.SparkException: Writing job aborted.}} {{/tmp/temporary- on executor not deleted due to org.apache.spark.SparkException: Writing job aborted., and this temp checkpoint can't used to recovery.}} > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fengyu Cao updated SPARK-26389: --- Description: `spark-submit --master mesos://- -conf spark.streaming.stopGracefullyOnShutdown=true ` CTRL-C, framework shutdown 18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error org.apache.spark.SparkException: Writing job aborted. /tmp/temporary- on executor not deleted due to `org.apache.spark.SparkException: Writing job aborted.`, and this temp checkpoint can't used to recovery. was: spark-submit --master mesos:// --conf spark.streaming.stopGracefullyOnShutdown=true CTRL-C, framework shutdown 18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error org.apache.spark.SparkException: Writing job aborted. /tmp/temporary- on executor not deleted due to `org.apache.spark.SparkException: Writing job aborted.`, and this temp checkpoint can't used to recovery. > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > `spark-submit --master mesos://- -conf > spark.streaming.stopGracefullyOnShutdown=true framework>` > CTRL-C, framework shutdown > 18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error > org.apache.spark.SparkException: Writing job aborted. > /tmp/temporary- on executor not deleted due to > `org.apache.spark.SparkException: Writing job aborted.`, and this temp > checkpoint can't used to recovery. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
[ https://issues.apache.org/jira/browse/SPARK-26389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fengyu Cao updated SPARK-26389: --- Description: {{spark-submit --master mesos:// -conf spark.streaming.stopGracefullyOnShutdown=true }} CTRL-C, framework shutdown {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error}} {{ org.apache.spark.SparkException: Writing job aborted.}} {{/tmp/temporary- on executor not deleted due to org.apache.spark.SparkException: Writing job aborted., and this temp checkpoint can't used to recovery.}} was: `spark-submit --master mesos://- -conf spark.streaming.stopGracefullyOnShutdown=true ` CTRL-C, framework shutdown 18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error org.apache.spark.SparkException: Writing job aborted. /tmp/temporary- on executor not deleted due to `org.apache.spark.SparkException: Writing job aborted.`, and this temp checkpoint can't used to recovery. > temp checkpoint folder at executor should be deleted on graceful shutdown > - > > Key: SPARK-26389 > URL: https://issues.apache.org/jira/browse/SPARK-26389 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Fengyu Cao >Priority: Major > > {{spark-submit --master mesos:// -conf > spark.streaming.stopGracefullyOnShutdown=true framework>}} > CTRL-C, framework shutdown > {{18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = > f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = > 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error}} > {{ org.apache.spark.SparkException: Writing job aborted.}} > {{/tmp/temporary- on executor not deleted due to > org.apache.spark.SparkException: Writing job aborted., and this temp > checkpoint can't used to recovery.}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26389) temp checkpoint folder at executor should be deleted on graceful shutdown
Fengyu Cao created SPARK-26389: -- Summary: temp checkpoint folder at executor should be deleted on graceful shutdown Key: SPARK-26389 URL: https://issues.apache.org/jira/browse/SPARK-26389 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Reporter: Fengyu Cao spark-submit --master mesos:// --conf spark.streaming.stopGracefullyOnShutdown=true CTRL-C, framework shutdown 18/12/18 10:27:36 ERROR MicroBatchExecution: Query [id = f512e17a-df88-4414-a5cd-a23550cf1e7f, runId = 24d99723-8d61-48c0-beab-af432f7a19d3] terminated with error org.apache.spark.SparkException: Writing job aborted. /tmp/temporary- on executor not deleted due to `org.apache.spark.SparkException: Writing job aborted.`, and this temp checkpoint can't used to recovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23431) Expose the new executor memory metrics at the stage level
[ https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23431: Assignee: Apache Spark > Expose the new executor memory metrics at the stage level > - > > Key: SPARK-23431 > URL: https://issues.apache.org/jira/browse/SPARK-23431 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Assignee: Apache Spark >Priority: Major > > Collect and show the new executor memory metrics for each stage, to provide > more information on how memory is used per stage. > Modify the AppStatusListener to track the peak values for JVM used memory, > execution memory, storage memory, and unified memory for each executor for > each stage. > Add the peak values for the metrics to the stages REST API. Also add a new > stageSummary REST API, which will return executor summary metrics for a > specified stage: > {code:java} > curl http://:18080/api/v1/applications/ id>// attempt>/executorSummary{code} > This is a subtask for SPARK-23206. Please refer to the design doc for that > ticket for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23431) Expose the new executor memory metrics at the stage level
[ https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23431: Assignee: (was: Apache Spark) > Expose the new executor memory metrics at the stage level > - > > Key: SPARK-23431 > URL: https://issues.apache.org/jira/browse/SPARK-23431 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > > Collect and show the new executor memory metrics for each stage, to provide > more information on how memory is used per stage. > Modify the AppStatusListener to track the peak values for JVM used memory, > execution memory, storage memory, and unified memory for each executor for > each stage. > Add the peak values for the metrics to the stages REST API. Also add a new > stageSummary REST API, which will return executor summary metrics for a > specified stage: > {code:java} > curl http://:18080/api/v1/applications/ id>// attempt>/executorSummary{code} > This is a subtask for SPARK-23206. Please refer to the design doc for that > ticket for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23431) Expose the new executor memory metrics at the stage level
[ https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723610#comment-16723610 ] ASF GitHub Bot commented on SPARK-23431: edwinalu opened a new pull request #23340: [SPARK-23431][CORE] Expose the new executor memory metrics at the stage level URL: https://github.com/apache/spark/pull/23340 ## What changes were proposed in this pull request? Collect and show the new executor memory metrics for each stage, to provide more information on how memory is used per stage. Peak values for metrics are show for each stage. For executor summaries for each stage, the peak values per executor are also shown. ## How was this patch tested? Added new unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Expose the new executor memory metrics at the stage level > - > > Key: SPARK-23431 > URL: https://issues.apache.org/jira/browse/SPARK-23431 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > > Collect and show the new executor memory metrics for each stage, to provide > more information on how memory is used per stage. > Modify the AppStatusListener to track the peak values for JVM used memory, > execution memory, storage memory, and unified memory for each executor for > each stage. > Add the peak values for the metrics to the stages REST API. Also add a new > stageSummary REST API, which will return executor summary metrics for a > specified stage: > {code:java} > curl http://:18080/api/v1/applications/ id>// attempt>/executorSummary{code} > This is a subtask for SPARK-23206. Please refer to the design doc for that > ticket for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723552#comment-16723552 ] ASF GitHub Bot commented on SPARK-24561: asfgit closed pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f98e550e39da8..d188de39e21c7 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2982,8 +2982,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2|6.0| +---+---+ - This example shows using grouped aggregated UDFs as window functions. Note that only - unbounded window frame is supported at the moment: + This example shows using grouped aggregated UDFs as window functions. >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> from pyspark.sql import Window @@ -2993,20 +2992,24 @@ def pandas_udf(f=None, returnType=None, functionType=None): >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def mean_udf(v): ... return v.mean() - >>> w = Window \\ - ... .partitionBy('id') \\ - ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + >>> w = (Window.partitionBy('id') + ....orderBy('v') + ....rowsBetween(-1, 0)) >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP +---++--+ | id| v|mean_v| +---++--+ - | 1| 1.0| 1.5| + | 1| 1.0| 1.0| | 1| 2.0| 1.5| - | 2| 3.0| 6.0| - | 2| 5.0| 6.0| - | 2|10.0| 6.0| + | 2| 3.0| 3.0| + | 2| 5.0| 4.0| + | 2|10.0| 7.5| +---++--+ + .. note:: For performance reasons, the input series to window functions are not copied. +Therefore, mutating the input series is not allowed and will cause incorrect results. +For the same reason, users should also not rely on the index of the input series. + .. seealso:: :meth:`pyspark.sql.GroupedData.agg` and :class:`pyspark.sql.Window` .. note:: The user-defined functions are considered deterministic by default. Due to diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py b/python/pyspark/sql/tests/test_pandas_udf_window.py index f0e6d2696df62..1b7df6797e9e6 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_window.py +++ b/python/pyspark/sql/tests/test_pandas_udf_window.py @@ -47,6 +47,15 @@ def pandas_scalar_time_two(self): from pyspark.sql.functions import pandas_udf return pandas_udf(lambda v: v * 2, 'double') +@property +def pandas_agg_count_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('long', PandasUDFType.GROUPED_AGG) +def count(v): +return len(v) +return count + @property def pandas_agg_mean_udf(self): from pyspark.sql.functions import pandas_udf, PandasUDFType @@ -77,7 +86,7 @@ def min(v): @property def unbounded_window(self): return Window.partitionBy('id') \ -.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing).orderBy('v') @property def ordered_window(self): @@ -87,6 +96,32 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4) + +@property +def growing_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3) + +@property +def growing_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(Window.unboundedPreceding, 4) + +@property +def shrinking_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing) + +@property +def shrinking_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(-3, Window.unboundedFollowing) + def test_simple(self): from
[jira] [Resolved] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-24561. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 22305 [https://github.com/apache/spark/pull/22305] > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Assignee: Li Jin >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-24561: Assignee: Li Jin > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Assignee: Li Jin >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26388) No support for "alter table .. replace columns" to drop columns
[ https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nirav patel updated SPARK-26388: Affects Version/s: 2.3.2 > No support for "alter table .. replace columns" to drop columns > --- > > Key: SPARK-26388 > URL: https://issues.apache.org/jira/browse/SPARK-26388 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.1, 2.3.2 >Reporter: nirav patel >Priority: Major > > Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 > > {{alterSchemaSql : alter table myschema.mytable replace columns (a int,b > int,d int) Exception in thread "main" > org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: > alter table replace columns(line 2, pos 6) }} > {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: > https://issues.apache.org/jira/browse/SPARK-18893 > > Replace columns should be supported as well. afaik, that's the only way to > delete hive columns. > > > It supposed to work according to this docs: > [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] > [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features] > > but it's throwing error for me on 2 different versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25689) Move token renewal logic to driver in yarn-client mode
[ https://issues.apache.org/jira/browse/SPARK-25689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25689: Assignee: (was: Apache Spark) > Move token renewal logic to driver in yarn-client mode > -- > > Key: SPARK-25689 > URL: https://issues.apache.org/jira/browse/SPARK-25689 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Minor > > Currently, both in yarn-cluster and yarn-client mode, the YARN AM is > responsible for renewing delegation tokens. That differs from other RMs > (Mesos and later k8s when it supports this functionality), and is one of the > roadblocks towards fully sharing the same delegation token-related code. > We should look at keeping the renewal logic within the driver in yarn-client > mode. That would also remove the need to distribute the user's keytab to the > AM when running in that particular mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25689) Move token renewal logic to driver in yarn-client mode
[ https://issues.apache.org/jira/browse/SPARK-25689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25689: Assignee: Apache Spark > Move token renewal logic to driver in yarn-client mode > -- > > Key: SPARK-25689 > URL: https://issues.apache.org/jira/browse/SPARK-25689 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark >Priority: Minor > > Currently, both in yarn-cluster and yarn-client mode, the YARN AM is > responsible for renewing delegation tokens. That differs from other RMs > (Mesos and later k8s when it supports this functionality), and is one of the > roadblocks towards fully sharing the same delegation token-related code. > We should look at keeping the renewal logic within the driver in yarn-client > mode. That would also remove the need to distribute the user's keytab to the > AM when running in that particular mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25689) Move token renewal logic to driver in yarn-client mode
[ https://issues.apache.org/jira/browse/SPARK-25689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723523#comment-16723523 ] ASF GitHub Bot commented on SPARK-25689: vanzin opened a new pull request #23338: [SPARK-25689][yarn] Make driver, not AM, manage delegation tokens. URL: https://github.com/apache/spark/pull/23338 This change modifies the behavior of the delegation token code when running on YARN, so that the driver controls the renewal, in both client and cluster mode. For that, a few different things were changed: * The AM code only runs code that needs DTs when DTs are available. In a way, this restores the AM behavior to what it was pre-SPARK-23361, but keeping the fix added in that bug. Basically, all the AM code is run in a "UGI.doAs()" block; but code that needs to talk to HDFS (basically the distributed cache handling code) was delayed to the point where the driver is up and running, and thus when valid delegation tokens are available. * SparkSubmit / ApplicationMaster now handle user login, not the token manager. The previous AM code was relying on the token manager to keep the user logged in when keytabs are used. This required some odd APIs in the token manager and the AM so that the right UGI was exposed and used in the right places. After this change, the logged in user is handled separately from the token manager, so the API was cleaned up, and, as explained above, the whole AM runs under the logged in user, which also helps with simplifying some more code. * Distributed cache configs are sent separately to the AM. Because of the delayed initialization of the cached resources in the AM, it became easier to write the cache config to a separate properties file instead of bundling it with the rest of the Spark config. This also avoids having to modify the SparkConf to hide things from the UI. * Finally, the AM doesn't manage the token manager anymore. The above changes allow the token manager to be completely handled by the driver's scheduler backend code also in YARN mode (whether client or cluster), making it similar to other RMs. To maintain the fix added in SPARK-23361 also in client mode, the AM now sends an extra message to the driver on initialization to fetch delegation tokens; and although it might not really be needed, the driver also keeps the running AM updated when new tokens are created. Tested in a kerberized cluster with the same tests used to validate SPARK-23361, in both client and cluster mode. Also tested with a non-kerberized cluster. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move token renewal logic to driver in yarn-client mode > -- > > Key: SPARK-25689 > URL: https://issues.apache.org/jira/browse/SPARK-25689 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Minor > > Currently, both in yarn-cluster and yarn-client mode, the YARN AM is > responsible for renewing delegation tokens. That differs from other RMs > (Mesos and later k8s when it supports this functionality), and is one of the > roadblocks towards fully sharing the same delegation token-related code. > We should look at keeping the renewal logic within the driver in yarn-client > mode. That would also remove the need to distribute the user's keytab to the > AM when running in that particular mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26388) No support for "alter table .. replace columns" to drop columns
nirav patel created SPARK-26388: --- Summary: No support for "alter table .. replace columns" to drop columns Key: SPARK-26388 URL: https://issues.apache.org/jira/browse/SPARK-26388 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.1, 2.2.1 Reporter: nirav patel Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 {{alterSchemaSql : alter table myschema.mytable replace columns (a int,b int,d int) Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: alter table replace columns(line 2, pos 6) }} {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: https://issues.apache.org/jira/browse/SPARK-18893 Replace columns should be supported as well. afaik, that's the only way to delete hive columns. It supposed to work according to this docs: [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features] but it's throwing error for me on 2 different versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26346) Upgrade parquet to 1.11.0
[ https://issues.apache.org/jira/browse/SPARK-26346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723518#comment-16723518 ] Dongjoon Hyun commented on SPARK-26346: --- +1 > Upgrade parquet to 1.11.0 > - > > Key: SPARK-26346 > URL: https://issues.apache.org/jira/browse/SPARK-26346 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Yuming Wang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26367) Remove ReplaceExceptWithFilter from nonExcludableRules
[ https://issues.apache.org/jira/browse/SPARK-26367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-26367. --- Resolution: Won't Do Assignee: (was: Xiao Li) Since the PR is closed, I'll close this issue for now. Please feel free to reopen this if needed. > Remove ReplaceExceptWithFilter from nonExcludableRules > -- > > Key: SPARK-26367 > URL: https://issues.apache.org/jira/browse/SPARK-26367 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Priority: Major > > ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26375: -- Component/s: (was: Optimizer) SQL > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may result in inaccurate > statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723469#comment-16723469 ] Dongjoon Hyun commented on SPARK-26377: --- Hi, [~kumarg.pavan]. JIRA is not for Q If you have a question, please see http://spark.apache.org/community.html . There exists helpful resources. Also, `Fix Versions` and `Target Versions` are filled when the bug is fixed. (http://spark.apache.org/contributing.html) > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Critical > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. Please let me > know any quick resolution possible. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": \\{ "message": "java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0", > "errorClass": "java.lang.RuntimeException", "stack": > "java.lang.RuntimeException: java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using
[jira] [Commented] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723472#comment-16723472 ] Dongjoon Hyun commented on SPARK-26377: --- BTW, did you try the latest version, Apache Spark 2.4.0? If you didn't, I recommend you to try it. It's better than 2.2.1. > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Critical > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. Please let me > know any quick resolution possible. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": \\{ "message": "java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0", > "errorClass": "java.lang.RuntimeException", "stack": > "java.lang.RuntimeException: java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using org.apache.spark.util.ThreadUtils ... > ()\n\tat > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat > >
[jira] [Updated] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26377: -- Fix Version/s: (was: 2.2.1) > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Critical > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. Please let me > know any quick resolution possible. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": \\{ "message": "java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0", > "errorClass": "java.lang.RuntimeException", "stack": > "java.lang.RuntimeException: java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using org.apache.spark.util.ThreadUtils ... > ()\n\tat > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat > > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat > >
[jira] [Updated] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26377: -- Target Version/s: (was: 2.2.1) > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Critical > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. Please let me > know any quick resolution possible. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": \\{ "message": "java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0", > "errorClass": "java.lang.RuntimeException", "stack": > "java.lang.RuntimeException: java.lang.IllegalStateException: No current > assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using org.apache.spark.util.ThreadUtils ... > ()\n\tat > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat > > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat > >
[jira] [Commented] (SPARK-26019) pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" in authenticate_and_accum_updates()
[ https://issues.apache.org/jira/browse/SPARK-26019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723418#comment-16723418 ] ASF GitHub Bot commented on SPARK-26019: squito opened a new pull request #23337: [SPARK-26019][PYSPARK] Allow insecure py4j gateways URL: https://github.com/apache/spark/pull/23337 Spark always creates secure py4j connections between java and python, but it also allows users to pass in their own connection. This restores the ability for users to pass in an _insecure_ connection, though it forces them to set 'spark.python.allowInsecurePy4j=true' and still issues a warning. Added test cases verifying the failure without the extra configuration, and verifying things still work with an insecure configuration (in particular, accumulators, as those were broken with an insecure py4j gateway before). For the tests, I added ways to create insecure gateways, but I tried to put in protections to make sure that wouldn't get used incorrectly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" > in authenticate_and_accum_updates() > > > Key: SPARK-26019 > URL: https://issues.apache.org/jira/browse/SPARK-26019 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2, 2.4.0 >Reporter: Ruslan Dautkhanov >Priority: Major > > pyspark's accumulator server expects a secure py4j connection between python > and the jvm. Spark will normally create a secure connection, but there is a > public api which allows you to pass in your own py4j connection. (this is > used by zeppelin, at least.) When this happens, you get an error like: > {noformat} > pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" > in authenticate_and_accum_updates() > {noformat} > We should change pyspark to > 1) warn loudly if a user passes in an insecure connection > 1a) I'd like to suggest that we even error out, unless the user actively > opts-in with a config like "spark.python.allowInsecurePy4j=true" > 2) The accumulator server should be changed to allow insecure connections. > note that SPARK-26349 will disallow insecure connections completely in 3.0. > > More info on how this occurs: > {code:python} > Exception happened during processing of request from ('127.0.0.1', 43418) > > Traceback (most recent call last): > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 290, in _handle_request_noblock > self.process_request(request, client_address) > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 318, in process_request > self.finish_request(request, client_address) > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 331, in finish_request > self.RequestHandlerClass(request, client_address, self) > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 652, in __init__ > self.handle() > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py", > line 263, in handle > poll(authenticate_and_accum_updates) > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py", > line 238, in poll > if func(): > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py", > line 251, in authenticate_and_accum_updates > received_token = self.rfile.read(len(auth_token)) > TypeError: object of type 'NoneType' has no len() > > {code} > > Error happens here: > https://github.com/apache/spark/blob/cb90617f894fd51a092710271823ec7d1cd3a668/python/pyspark/accumulators.py#L254 > The PySpark code was just running a simple pipeline of > binary_rdd = sc.binaryRecords(full_file_path, record_length).map(lambda .. ) > and then converting it to a dataframe and running a count on it. > It seems error is flaky - on next rerun it didn't happen. (But accumulators > don't actually work.) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26387) Parallelism seems to cause difference in CrossValidation model metrics
Evan Zamir created SPARK-26387: -- Summary: Parallelism seems to cause difference in CrossValidation model metrics Key: SPARK-26387 URL: https://issues.apache.org/jira/browse/SPARK-26387 Project: Spark Issue Type: Bug Components: ML, MLlib Affects Versions: 2.3.2, 2.3.1 Reporter: Evan Zamir I can only reproduce this issue when running Spark on different Amazon EMR versions, but it seems that between Spark 2.3.1 and 2.3.2 (corresponding to EMR versions 5.17/5.18) the presence of the parallelism parameter was causing AUC metric to increase. Literally, I run the same exact code with and without parallelism and the AUC of my models (logistic regression) are changing significantly. I can't find a previous bug report relating to this, so I'm posting this as new. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch
[ https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723392#comment-16723392 ] ASF GitHub Bot commented on SPARK-25922: asfgit closed pull request #23322: [SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch URL: https://github.com/apache/spark/pull/23322 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 68f6f2e46e316..03f5da2bb0bce 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.ExecutorService -import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import io.fabric8.kubernetes.client.KubernetesClient + import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ @@ -39,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend( lifecycleEventHandler: ExecutorPodsLifecycleManager, watchEvents: ExecutorPodsWatchSnapshotSource, pollEvents: ExecutorPodsPollingSnapshotSource) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { +extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { - private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( -requestExecutorsService) + private implicit val requestExecutorContext = +ExecutionContext.fromExecutorService(requestExecutorsService) protected override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { @@ -60,6 +61,17 @@ private[spark] class KubernetesClusterSchedulerBackend( removeExecutor(executorId, reason) } + /** + * Get an application ID associated with the job. + * This returns the string value of spark.app.id if set, otherwise + * the locally-generated ID from the superclass. + * + * @return The application ID + */ + override def applicationId(): String = { + conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId) + } + override def start(): Unit = { super.start() if (!Utils.isDynamicAllocationEnabled(conf)) { @@ -88,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend( if (shouldDeleteExecutors) { Utils.tryLogNonFatalError { -kubernetesClient.pods() +kubernetesClient + .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .delete() @@ -120,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { -kubernetesClient.pods() +kubernetesClient + .pods() .withLabel(SPARK_APP_ID_LABEL, applicationId()) .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*) @@ -133,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) -extends DriverEndpoint(rpcEnv, sparkProperties) { + extends DriverEndpoint(rpcEnv, sparkProperties) { override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 75232f7b98b04..6e182bed459f8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn private val
[jira] [Commented] (SPARK-19261) Support `ALTER TABLE table_name ADD COLUMNS(..)` statement
[ https://issues.apache.org/jira/browse/SPARK-19261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723382#comment-16723382 ] nirav patel commented on SPARK-19261: - It happens for `Replace columns` as well `Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: alter table replace columns(line 2, pos 6)` > Support `ALTER TABLE table_name ADD COLUMNS(..)` statement > -- > > Key: SPARK-19261 > URL: https://issues.apache.org/jira/browse/SPARK-19261 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: StanZhai >Assignee: Xin Wu >Priority: Major > Fix For: 2.2.0 > > > We should support `ALTER TABLE table_name ADD COLUMNS(..)` statement, which > already be used in version < 2.x. > This is very useful for those who want to upgrade there Spark version to 2.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26386) [K8s] Add configuration for executor OwnerReference when running in client mode
[ https://issues.apache.org/jira/browse/SPARK-26386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nihar Sheth resolved SPARK-26386. - Resolution: Not A Problem Attempted reinvention of "spark.kubernetes.driver.pod.name", which handles this use case. > [K8s] Add configuration for executor OwnerReference when running in client > mode > --- > > Key: SPARK-26386 > URL: https://issues.apache.org/jira/browse/SPARK-26386 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Nihar Sheth >Priority: Major > > When running in cluster mode, the executor pods have their owner references > set to the driver pod, which should handle cleanup when the driver pod dies. > In client mode, though, the driver is not always a pod, so there is no owner > reference set at all. It would be useful to offer a configuration to support > setting an executor's owner reference in client mode, for situations where we > want the executors' lifecycles tied to another pod (for example, if there's a > pod on the cluster that wants to run a Spark job in client mode on the same > cluster, and we want to clean up the executors if the driver pod here dies). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Gekk updated SPARK-26246: --- Summary: Infer timestamp types from JSON (was: Infer date and timestamp types from JSON) > Infer timestamp types from JSON > --- > > Key: SPARK-26246 > URL: https://issues.apache.org/jira/browse/SPARK-26246 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Priority: Minor > > Currently, DateType and TimestampType cannot be inferred from JSON. To parse > JSON string, you have to specify schema explicitly if JSON input contains > dates or timestamps. This ticket aims to extend JsonInferSchema to support > such inferring. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26246) Infer timestamp types from JSON
[ https://issues.apache.org/jira/browse/SPARK-26246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Gekk updated SPARK-26246: --- Description: Currently, TimestampType cannot be inferred from JSON. To parse JSON string, you have to specify schema explicitly if JSON input contains timestamps. This ticket aims to extend JsonInferSchema to support such inferring. (was: Currently, DateType and TimestampType cannot be inferred from JSON. To parse JSON string, you have to specify schema explicitly if JSON input contains dates or timestamps. This ticket aims to extend JsonInferSchema to support such inferring.) > Infer timestamp types from JSON > --- > > Key: SPARK-26246 > URL: https://issues.apache.org/jira/browse/SPARK-26246 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Priority: Minor > > Currently, TimestampType cannot be inferred from JSON. To parse JSON string, > you have to specify schema explicitly if JSON input contains timestamps. This > ticket aims to extend JsonInferSchema to support such inferring. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24933) SinkProgress should report written rows
[ https://issues.apache.org/jira/browse/SPARK-24933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723316#comment-16723316 ] ASF GitHub Bot commented on SPARK-24933: asfgit closed pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress URL: https://github.com/apache/spark/pull/21919 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala index d46c4139011da..07d2b8a5dc420 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -232,6 +232,27 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest { } } + test("streaming - sink progress is produced") { +/* ensure sink progress is correctly produced. */ +val input = MemoryStream[String] +val topic = newTopic() +testUtils.createTopic(topic) + +val writer = createKafkaWriter( + input.toDF(), + withTopic = Some(topic), + withOutputMode = Some(OutputMode.Update()))() + +try { + input.addData("1", "2", "3") + failAfter(streamingTimeout) { +writer.processAllAvailable() + } + assert(writer.lastProgress.sink.numOutputRows == 3L) +} finally { + writer.stop() +} + } test("streaming - write data with bad schema") { val input = MemoryStream[String] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 9a1fe1e0a328b..d7e20eed4cbc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{LongAccumulator, Utils} /** * Deprecated logical plan for writing data into data source v2. This is being replaced by more @@ -47,6 +47,8 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) extends UnaryExecNode { + var commitProgress: Option[StreamWriterCommitProgress] = None + override def child: SparkPlan = query override def output: Seq[Attribute] = Nil @@ -55,6 +57,7 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark val useCommitCoordinator = writeSupport.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) +val totalNumRowsAccumulator = new LongAccumulator() logInfo(s"Start processing data source write support: $writeSupport. " + s"The input RDD has ${messages.length} partitions.") @@ -65,15 +68,18 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark (context: TaskContext, iter: Iterator[InternalRow]) => DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator), rdd.partitions.indices, -(index, message: WriterCommitMessage) => { - messages(index) = message - writeSupport.onDataWriterCommit(message) +(index, result: DataWritingSparkTaskResult) => { + val commitMessage = result.writerCommitMessage + messages(index) = commitMessage + totalNumRowsAccumulator.add(result.numRows) + writeSupport.onDataWriterCommit(commitMessage) } ) logInfo(s"Data source write support $writeSupport is committing.") writeSupport.commit(messages) logInfo(s"Data source write support $writeSupport committed.") + commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { case cause: Throwable => logError(s"Data source write support $writeSupport is aborting.") @@ -102,7 +108,7 @@ object DataWritingSparkTask extends Logging { writerFactory: DataWriterFactory, context: TaskContext, iter: Iterator[InternalRow], - useCommitCoordinator: Boolean):
[jira] [Resolved] (SPARK-24933) SinkProgress should report written rows
[ https://issues.apache.org/jira/browse/SPARK-24933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24933. - Resolution: Fixed Assignee: Vaclav Kosar Fix Version/s: 3.0.0 > SinkProgress should report written rows > --- > > Key: SPARK-24933 > URL: https://issues.apache.org/jira/browse/SPARK-24933 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.1 >Reporter: Vaclav Kosar >Assignee: Vaclav Kosar >Priority: Major > Fix For: 3.0.0 > > > SinkProgress should report similar properties like SourceProgress as long as > they are available for given Sink. Count of written rows is metric availble > for all Sinks. Since relevant progress information is with respect to > commited rows, ideal object to carry this info is WriterCommitMessage. For > brevity the implementation will focus only on Sinks with API V2 and on Micro > Batch mode. Implemention for Continuous mode will be provided at later date. > h4. Before > {code} > {"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider@3c0bd317"} > {code} > h4. After > {code} > {"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider@3c0bd317","numOutputRows":5000} > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26386) [K8s] Add configuration for executor OwnerReference when running in client mode
[ https://issues.apache.org/jira/browse/SPARK-26386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723299#comment-16723299 ] Nihar Sheth commented on SPARK-26386: - Ah, yes, that seems like it does...Testing it out to make sure it handles our use cases, will close this if it does. Thanks! > [K8s] Add configuration for executor OwnerReference when running in client > mode > --- > > Key: SPARK-26386 > URL: https://issues.apache.org/jira/browse/SPARK-26386 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Nihar Sheth >Priority: Major > > When running in cluster mode, the executor pods have their owner references > set to the driver pod, which should handle cleanup when the driver pod dies. > In client mode, though, the driver is not always a pod, so there is no owner > reference set at all. It would be useful to offer a configuration to support > setting an executor's owner reference in client mode, for situations where we > want the executors' lifecycles tied to another pod (for example, if there's a > pod on the cluster that wants to run a Spark job in client mode on the same > cluster, and we want to clean up the executors if the driver pod here dies). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26386) [K8s] Add configuration for executor OwnerReference when running in client mode
[ https://issues.apache.org/jira/browse/SPARK-26386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723281#comment-16723281 ] Nihar Sheth commented on SPARK-26386: - I'll be working on this > [K8s] Add configuration for executor OwnerReference when running in client > mode > --- > > Key: SPARK-26386 > URL: https://issues.apache.org/jira/browse/SPARK-26386 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Nihar Sheth >Priority: Major > > When running in cluster mode, the executor pods have their owner references > set to the driver pod, which should handle cleanup when the driver pod dies. > In client mode, though, the driver is not always a pod, so there is no owner > reference set at all. It would be useful to offer a configuration to support > setting an executor's owner reference in client mode, for situations where we > want the executors' lifecycles tied to another pod (for example, if there's a > pod on the cluster that wants to run a Spark job in client mode on the same > cluster, and we want to clean up the executors if the driver pod here dies). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26386) [K8s] Add configuration for executor OwnerReference when running in client mode
[ https://issues.apache.org/jira/browse/SPARK-26386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723283#comment-16723283 ] Marcelo Vanzin commented on SPARK-26386: Doesn't {{spark.kubernetes.driver.pod.name}} do that already? > [K8s] Add configuration for executor OwnerReference when running in client > mode > --- > > Key: SPARK-26386 > URL: https://issues.apache.org/jira/browse/SPARK-26386 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Nihar Sheth >Priority: Major > > When running in cluster mode, the executor pods have their owner references > set to the driver pod, which should handle cleanup when the driver pod dies. > In client mode, though, the driver is not always a pod, so there is no owner > reference set at all. It would be useful to offer a configuration to support > setting an executor's owner reference in client mode, for situations where we > want the executors' lifecycles tied to another pod (for example, if there's a > pod on the cluster that wants to run a Spark job in client mode on the same > cluster, and we want to clean up the executors if the driver pod here dies). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26386) [K8s] Add configuration for executor OwnerReference when running in client mode
Nihar Sheth created SPARK-26386: --- Summary: [K8s] Add configuration for executor OwnerReference when running in client mode Key: SPARK-26386 URL: https://issues.apache.org/jira/browse/SPARK-26386 Project: Spark Issue Type: Improvement Components: Kubernetes Affects Versions: 3.0.0 Reporter: Nihar Sheth When running in cluster mode, the executor pods have their owner references set to the driver pod, which should handle cleanup when the driver pod dies. In client mode, though, the driver is not always a pod, so there is no owner reference set at all. It would be useful to offer a configuration to support setting an executor's owner reference in client mode, for situations where we want the executors' lifecycles tied to another pod (for example, if there's a pod on the cluster that wants to run a Spark job in client mode on the same cluster, and we want to clean up the executors if the driver pod here dies). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26371) Increase Kafka ConfigUpdater test coverage
[ https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723243#comment-16723243 ] ASF GitHub Bot commented on SPARK-26371: asfgit closed pull request #23321: [SPARK-26371][SS] Increase kafka ConfigUpdater test coverage. URL: https://github.com/apache/spark/pull/23321 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index de8731c4b774b..1c77906f43b17 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -106,6 +106,11 @@ ${jetty.version} test + + org.mockito + mockito-core + test + org.scalacheck scalacheck_${scala.binary.version} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala new file mode 100644 index 0..bc1b8019f6a63 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.config.SaslConfigs + +import org.apache.spark.SparkEnv +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Kafka + +/** + * Class to conveniently update Kafka config params, while logging the changes + */ +private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String]) +extends Logging { + private val map = new ju.HashMap[String, Object](kafkaParams.asJava) + + def set(key: String, value: Object): this.type = { +map.put(key, value) +logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}") +this + } + + def setIfUnset(key: String, value: Object): this.type = { +if (!map.containsKey(key)) { + map.put(key, value) + logDebug(s"$module: Set $key to $value") +} +this + } + + def setAuthenticationConfigIfNeeded(): this.type = { +// There are multiple possibilities to log in and applied in the following order: +// - JVM global security provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS +// configuration. +if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { + logDebug("JVM global security configuration detected, using it for login.") +} else if (KafkaSecurityHelper.isTokenAvailable()) { + logDebug("Delegation token detected, using it for login.") + val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) + require(mechanism.startsWith("SCRAM"), +"Delegation token works only with SCRAM mechanism.") + set(SaslConfigs.SASL_MECHANISM, mechanism) +} +this + } + + def build(): ju.Map[String, Object] = map +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6a0c2088ac3d1..ddd629b7ba716 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -24,13 +24,9 @@ import
[jira] [Resolved] (SPARK-26371) Increase Kafka ConfigUpdater test coverage
[ https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-26371. --- Resolution: Fixed Assignee: Gabor Somogyi Fix Version/s: 3.0.0 This is resolved via https://github.com/apache/spark/pull/23321 > Increase Kafka ConfigUpdater test coverage > -- > > Key: SPARK-26371 > URL: https://issues.apache.org/jira/browse/SPARK-26371 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming, Tests >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Minor > Fix For: 3.0.0 > > > As Kafka delegation token added logic into ConfigUpdater it would be good to > test it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26349) Pyspark should not accept insecure p4yj gateways
[ https://issues.apache.org/jira/browse/SPARK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723219#comment-16723219 ] Imran Rashid commented on SPARK-26349: -- I'm working on this > Pyspark should not accept insecure p4yj gateways > > > Key: SPARK-26349 > URL: https://issues.apache.org/jira/browse/SPARK-26349 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Imran Rashid >Priority: Blocker > > Pyspark normally produces a secure py4j connection between python & the jvm, > but it also lets users provide their own connection. Spark should fail fast > if that connection is insecure. > Note this is breaking a public api, which is why this is targeted at 3.0.0. > SPARK-26019 is about adding a warning, but still allowing the old behavior to > work, in 2.x -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26019) pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" in authenticate_and_accum_updates()
[ https://issues.apache.org/jira/browse/SPARK-26019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723218#comment-16723218 ] Imran Rashid commented on SPARK-26019: -- I'm working on this > pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" > in authenticate_and_accum_updates() > > > Key: SPARK-26019 > URL: https://issues.apache.org/jira/browse/SPARK-26019 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2, 2.4.0 >Reporter: Ruslan Dautkhanov >Priority: Major > > pyspark's accumulator server expects a secure py4j connection between python > and the jvm. Spark will normally create a secure connection, but there is a > public api which allows you to pass in your own py4j connection. (this is > used by zeppelin, at least.) When this happens, you get an error like: > {noformat} > pyspark/accumulators.py: "TypeError: object of type 'NoneType' has no len()" > in authenticate_and_accum_updates() > {noformat} > We should change pyspark to > 1) warn loudly if a user passes in an insecure connection > 1a) I'd like to suggest that we even error out, unless the user actively > opts-in with a config like "spark.python.allowInsecurePy4j=true" > 2) The accumulator server should be changed to allow insecure connections. > note that SPARK-26349 will disallow insecure connections completely in 3.0. > > More info on how this occurs: > {code:python} > Exception happened during processing of request from ('127.0.0.1', 43418) > > Traceback (most recent call last): > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 290, in _handle_request_noblock > self.process_request(request, client_address) > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 318, in process_request > self.finish_request(request, client_address) > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 331, in finish_request > self.RequestHandlerClass(request, client_address, self) > File "/opt/cloudera/parcels/Anaconda/lib/python2.7/SocketServer.py", line > 652, in __init__ > self.handle() > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py", > line 263, in handle > poll(authenticate_and_accum_updates) > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py", > line 238, in poll > if func(): > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/accumulators.py", > line 251, in authenticate_and_accum_updates > received_token = self.rfile.read(len(auth_token)) > TypeError: object of type 'NoneType' has no len() > > {code} > > Error happens here: > https://github.com/apache/spark/blob/cb90617f894fd51a092710271823ec7d1cd3a668/python/pyspark/accumulators.py#L254 > The PySpark code was just running a simple pipeline of > binary_rdd = sc.binaryRecords(full_file_path, record_length).map(lambda .. ) > and then converting it to a dataframe and running a count on it. > It seems error is flaky - on next rerun it didn't happen. (But accumulators > don't actually work.) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723213#comment-16723213 ] ASF GitHub Bot commented on SPARK-26255: asfgit closed pull request #23312: [SPARK-26255][YARN] Apply user provided UI filters to SQL tab in yarn mode URL: https://github.com/apache/spark/pull/23312 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 67c36aac49266..1289d4be79ea4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -168,8 +168,10 @@ private[spark] abstract class YarnSchedulerBackend( filterName != null && filterName.nonEmpty && filterParams != null && filterParams.nonEmpty if (hasFilter) { + // SPARK-26255: Append user provided filters(spark.ui.filters) with yarn filter. + val allFilters = filterName + "," + conf.get("spark.ui.filters", "") logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") - conf.set("spark.ui.filters", filterName) + conf.set("spark.ui.filters", allFilters) filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) } scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Assignee: Chakravarthi >Priority: Major > Fix For: 3.0.0 > > Attachments: command.png, logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-26255: -- Assignee: Chakravarthi > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Assignee: Chakravarthi >Priority: Major > Fix For: 3.0.0 > > Attachments: command.png, logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-26255. Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23312 [https://github.com/apache/spark/pull/23312] > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > Fix For: 3.0.0 > > Attachments: command.png, logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26223) Scan: track metastore operation time
[ https://issues.apache.org/jira/browse/SPARK-26223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723111#comment-16723111 ] Yuanjian Li edited comment on SPARK-26223 at 12/17/18 4:10 PM: --- The usage of externalCatalog in `SessionCatalog` and the interface of `ExternalCatalog` are clear clues for this issue. Most interfaces in ExternalCatalog used in DDL, listing all scenario for metastore operations relative of Scan below: # getTable: called by analyzing rule ResolveRelation's lookupRelation. # listPartitions: 1. Called by execution stage about HiveTableScanExec during getting raw Partitions. 2. Called by optimize rule OptimizeMetadataOnlyQuery's replaceTableScanWithPartitionMetadata. 3. Called by HiveMetastoreCtalog.convertToLogicalRelation when lazy pruning is disabled, the entrance of this scenario is the analysis rule RelationConversions of hive analyzer. # listPartitionsByFilter: 1. Same with 2.1 2. Same with 2.2 3. Called by CatalogFileIndex, currently, we address this meta store operation time by adding in file listing([discussion link|https://github.com/apache/spark/pull/23327#discussion_r242076144]), will split in this PR. We can address all this scenario by appending phase to a new-added array buffer in the `CatalogTable` parameter list and dump the records to metrics in the scan node. was (Author: xuanyuan): The usage of externalCatalog in `SessionCatalog` and the interface of `ExternalCatalog` are clear clues for this issue. Most interfaces in ExternalCatalog used in DDL, listing all scenario for metastore operations relative of Scan below: # getTable: called by analyzing rule ResolveRelation's lookupRelation. # listPartitions: 1. Called by execution stage about HiveTableScanExec during getting raw Partitions. 2. Called by optimize rule OptimizeMetadataOnlyQuery's replaceTableScanWithPartitionMetadata. 3. Called by HiveMetastoreCtalog.convertToLogicalRelation when lazy pruning is disabled, the entrance of this scenario is the analysis rule RelationConversions of hive analyzer. # listPartitionsByFilter: 1. Same with 2.1 2. Same with 2.2 3. Called by CatalogFileIndex, currently, we address this meta store operation time by adding in file listing([discussion link|https://github.com/apache/spark/pull/23327#discussion_r242076144]), will split in this PR. We can address all this scenario by appending phase to a new-added array buffer in the `CatalogTable` parameter list and dump the phase to metrics in scan node. > Scan: track metastore operation time > > > Key: SPARK-26223 > URL: https://issues.apache.org/jira/browse/SPARK-26223 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > The Scan node should report how much time it spent in metastore operations. > Similar to file listing, would be great to also report start and end time for > constructing a timeline. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26223) Scan: track metastore operation time
[ https://issues.apache.org/jira/browse/SPARK-26223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723111#comment-16723111 ] Yuanjian Li commented on SPARK-26223: - The usage of externalCatalog in `SessionCatalog` and the interface of `ExternalCatalog` are clear clues for this issue. Most interfaces in ExternalCatalog used in DDL, listing all scenario for metastore operations relative of Scan below: # getTable: called by analyzing rule ResolveRelation's lookupRelation. # listPartitions: 1. Called by execution stage about HiveTableScanExec during getting raw Partitions. 2. Called by optimize rule OptimizeMetadataOnlyQuery's replaceTableScanWithPartitionMetadata. 3. Called by HiveMetastoreCtalog.convertToLogicalRelation when lazy pruning is disabled, the entrance of this scenario is the analysis rule RelationConversions of hive analyzer. # listPartitionsByFilter: 1. Same with 2.1 2. Same with 2.2 3. Called by CatalogFileIndex, currently, we address this meta store operation time by adding in file listing([discussion link|https://github.com/apache/spark/pull/23327#discussion_r242076144]), will split in this PR. We can address all this scenario by appending phase to a new-added array buffer in the `CatalogTable` parameter list and dump the phase to metrics in scan node. > Scan: track metastore operation time > > > Key: SPARK-26223 > URL: https://issues.apache.org/jira/browse/SPARK-26223 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > The Scan node should report how much time it spent in metastore operations. > Similar to file listing, would be great to also report start and end time for > constructing a timeline. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26378) Queries of wide CSV data slowed after SPARK-26151
[ https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26378: Assignee: Apache Spark > Queries of wide CSV data slowed after SPARK-26151 > - > > Key: SPARK-26378 > URL: https://issues.apache.org/jira/browse/SPARK-26378 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Assignee: Apache Spark >Priority: Major > > A recent change significantly slowed the queries of wide CSV tables. For > example, queries against a 6000 column table slowed by 45-48% when queried > with a single executor. > > The [PR for > SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2] > changed FailureSafeParser#toResultRow such that the returned function > recreates every row, even when the associated input record has no parsing > issues and the user specified no corrupt record field in his/her schema. This > extra processing is responsible for the slowdown. > > I propose that a row should be recreated only if there is a parsing error or > columns need to be shifted due to the existence of a corrupt column field in > the user-supplied schema. Otherwise, the row should be used as-is. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26378) Queries of wide CSV data slowed after SPARK-26151
[ https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26378: Assignee: (was: Apache Spark) > Queries of wide CSV data slowed after SPARK-26151 > - > > Key: SPARK-26378 > URL: https://issues.apache.org/jira/browse/SPARK-26378 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Priority: Major > > A recent change significantly slowed the queries of wide CSV tables. For > example, queries against a 6000 column table slowed by 45-48% when queried > with a single executor. > > The [PR for > SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2] > changed FailureSafeParser#toResultRow such that the returned function > recreates every row, even when the associated input record has no parsing > issues and the user specified no corrupt record field in his/her schema. This > extra processing is responsible for the slowdown. > > I propose that a row should be recreated only if there is a parsing error or > columns need to be shifted due to the existence of a corrupt column field in > the user-supplied schema. Otherwise, the row should be used as-is. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26378) Queries of wide CSV data slowed after SPARK-26151
[ https://issues.apache.org/jira/browse/SPARK-26378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723095#comment-16723095 ] ASF GitHub Bot commented on SPARK-26378: bersprockets opened a new pull request #23336: [SPARK-26378][SQL] Restore performance of queries against wide CSV tables URL: https://github.com/apache/spark/pull/23336 ## What changes were proposed in this pull request? After recent changes to CSV parsing to return partial results for bad CSV records, queries of wide CSV tables slowed considerably. That recent change resulted in every row being recreated, even when the associated input record had no parsing issues and the user specified no corrupt record field in his/her schema In this PR, I propose that a row should be recreated only if there is a parsing error or columns need to be shifted due to the existence of a corrupt column field in the user-supplied schema. Otherwise, the row should be used as-is. This restores performance for the non-error case only. ### Benchmarks: baseline = commit before partial results change PR = this PR master = master branch The wide table has 6000 columns and 165,000 records, and the narrow table has 12 columns and 82,500,000 records. Tests are run with a single executor. In the following, positive percentages are bad (slower), negative are good (faster). Wide rows, all good records: baseline | pr | master | PR diff | master diff ---|-|---|---|--- 2.036489 min | 1.990344 min | 2.952561 min | -2.265882% | 44.982923% Wide rows, all bad records baseline | pr | master | PR diff | master diff ---|-|---|---|--- 1.660761 min | 3.016839 min | 3.011944 min | 81.653994% | 81.359283% Both my PR and the master branch are ~81% slower than the baseline when all records are bad but the user specified no corrupt record field in his/her schema. In fact, the master branch is reliably, but slightly, faster here, since it does not call badRecord() in this case. Wide rows, corrupt record field, all good records baseline | pr | master | PR diff | master diff ---|-|---|---|--- 2.912467 min | 2.893039 min | 2.905344 min | -0.667056% | -0.244543% Wide rows, corrupt record field, all bad records baseline | pr | master | PR diff | master diff ---|-|---|---|--- 2.441417 min | 2.979544 min | 2.957439 min | 22.041620% | 21.136180% Both my PR and the master branch are ~21-22% slower than the baseline when all records are bad and the user specified a corrupt record field in his/her schema. Narrow rows, all good records baseline | pr | master | diff1 | diff2 ---|-|---|---|--- 2.004539 min | 1.987183 min | 2.365122 min | -0.865813% | 17.988343% Narrow rows, corrupt record field, all good records baseline | pr | master | diff1 | diff2 ---|-|---|---|--- 2.390589 min | 2.382100 min | 2.379733 min | -0.355096% | -0.454095% ## How was this patch tested? All SQL unit tests Python core and SQL tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Queries of wide CSV data slowed after SPARK-26151 > - > > Key: SPARK-26378 > URL: https://issues.apache.org/jira/browse/SPARK-26378 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Priority: Major > > A recent change significantly slowed the queries of wide CSV tables. For > example, queries against a 6000 column table slowed by 45-48% when queried > with a single executor. > > The [PR for > SPARK-26151|https://github.com/apache/spark/commit/11e5f1bcd49eec8ab4225d6e68a051b5c6a21cb2] > changed FailureSafeParser#toResultRow such that the returned function > recreates every row, even when the associated input record has no parsing > issues and the user specified no corrupt record field in his/her schema. This > extra processing is responsible for the slowdown. > > I propose that a row should be recreated only if there is a parsing error or > columns need to be shifted due to the existence of a corrupt column field in > the user-supplied schema. Otherwise, the row should be used as-is. -- This message was sent by
[jira] [Commented] (SPARK-20351) Add trait hasTrainingSummary to replace the duplicate code
[ https://issues.apache.org/jira/browse/SPARK-20351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723078#comment-16723078 ] ASF GitHub Bot commented on SPARK-20351: srowen closed pull request #17654: [SPARK-20351] [ML] Add trait hasTrainingSummary to replace the duplicate code URL: https://github.com/apache/spark/pull/17654 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 27a7db0b2f5d4..f2a5c11a34867 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -934,8 +934,8 @@ class LogisticRegressionModel private[spark] ( @Since("2.1.0") val interceptVector: Vector, @Since("1.3.0") override val numClasses: Int, private val isMultinomial: Boolean) - extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] - with LogisticRegressionParams with MLWritable { + extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] with MLWritable + with LogisticRegressionParams with HasTrainingSummary[LogisticRegressionTrainingSummary] { require(coefficientMatrix.numRows == interceptVector.size, s"Dimension mismatch! Expected " + s"coefficientMatrix.numRows == interceptVector.size, but ${coefficientMatrix.numRows} != " + @@ -1018,20 +1018,16 @@ class LogisticRegressionModel private[spark] ( @Since("1.6.0") override val numFeatures: Int = coefficientMatrix.numCols - private var trainingSummary: Option[LogisticRegressionTrainingSummary] = None - /** * Gets summary of model on training set. An exception is thrown - * if `trainingSummary == None`. + * if `hasSummary` is false. */ @Since("1.5.0") - def summary: LogisticRegressionTrainingSummary = trainingSummary.getOrElse { -throw new SparkException("No training summary available for this LogisticRegressionModel") - } + override def summary: LogisticRegressionTrainingSummary = super.summary /** * Gets summary of model on training set. An exception is thrown - * if `trainingSummary == None` or it is a multiclass model. + * if `hasSummary` is false or it is a multiclass model. */ @Since("2.3.0") def binarySummary: BinaryLogisticRegressionTrainingSummary = summary match { @@ -1062,16 +1058,6 @@ class LogisticRegressionModel private[spark] ( (model, model.getProbabilityCol, model.getPredictionCol) } - private[classification] - def setSummary(summary: Option[LogisticRegressionTrainingSummary]): this.type = { -this.trainingSummary = summary -this - } - - /** Indicates whether a training summary exists for this model instance. */ - @Since("1.5.0") - def hasSummary: Boolean = trainingSummary.isDefined - /** * Evaluates the model on a test dataset. * diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index 1a94aefa3f563..49e9f51368131 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -87,8 +87,9 @@ private[clustering] trait BisectingKMeansParams extends Params with HasMaxIter @Since("2.0.0") class BisectingKMeansModel private[ml] ( @Since("2.0.0") override val uid: String, -private val parentModel: MLlibBisectingKMeansModel - ) extends Model[BisectingKMeansModel] with BisectingKMeansParams with MLWritable { +private val parentModel: MLlibBisectingKMeansModel) + extends Model[BisectingKMeansModel] with BisectingKMeansParams with MLWritable + with HasTrainingSummary[BisectingKMeansSummary] { @Since("2.0.0") override def copy(extra: ParamMap): BisectingKMeansModel = { @@ -143,28 +144,12 @@ class BisectingKMeansModel private[ml] ( @Since("2.0.0") override def write: MLWriter = new BisectingKMeansModel.BisectingKMeansModelWriter(this) - private var trainingSummary: Option[BisectingKMeansSummary] = None - - private[clustering] def setSummary(summary: Option[BisectingKMeansSummary]): this.type = { -this.trainingSummary = summary -this - } - - /** - * Return true if there exists summary of model. - */ - @Since("2.1.0") - def hasSummary: Boolean = trainingSummary.nonEmpty - /** * Gets summary of model on training set. An exception is - * thrown if `trainingSummary == None`. + * thrown if `hasSummary` is false. */ @Since("2.1.0")
[jira] [Resolved] (SPARK-20351) Add trait hasTrainingSummary to replace the duplicate code
[ https://issues.apache.org/jira/browse/SPARK-20351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20351. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 17654 [https://github.com/apache/spark/pull/17654] > Add trait hasTrainingSummary to replace the duplicate code > -- > > Key: SPARK-20351 > URL: https://issues.apache.org/jira/browse/SPARK-20351 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.2.0 >Reporter: yuhao yang >Assignee: yuhao yang >Priority: Minor > Fix For: 3.0.0 > > > Add a trait HasTrainingSummary to avoid code duplicate related to training > summary. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20351) Add trait hasTrainingSummary to replace the duplicate code
[ https://issues.apache.org/jira/browse/SPARK-20351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-20351: - Assignee: yuhao yang > Add trait hasTrainingSummary to replace the duplicate code > -- > > Key: SPARK-20351 > URL: https://issues.apache.org/jira/browse/SPARK-20351 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.2.0 >Reporter: yuhao yang >Assignee: yuhao yang >Priority: Minor > Fix For: 3.0.0 > > > Add a trait HasTrainingSummary to avoid code duplicate related to training > summary. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20894) Error while checkpointing to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723062#comment-16723062 ] Alexander Panzhin commented on SPARK-20894: --- I don't know why you would place this into a resolved state, but this is 100% definitely NOT resolved. Running on Spark 2.3.0 causes this issue. > Error while checkpointing to HDFS > - > > Key: SPARK-20894 > URL: https://issues.apache.org/jira/browse/SPARK-20894 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.1 > Environment: Ubuntu, Spark 2.1.1, hadoop 2.7 >Reporter: kant kodali >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.3.0 > > Attachments: driver_info_log, executor1_log, executor2_log > > > Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 > hours", "24 hours"), df1.col("AppName")).count(); > StreamingQuery query = df2.writeStream().foreach(new > KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start(); > query.awaitTermination(); > This for some reason fails with the Error > ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalStateException: Error reading delta file > /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = > (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: > /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist > I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/ and all > consumer offsets in Kafka from all brokers prior to running and yet this > error still persists. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26352) join reordering should not change the order of output attributes
[ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723050#comment-16723050 ] ASF GitHub Bot commented on SPARK-26352: cloud-fan closed pull request #2: [SPARK-26352][SQL][FOLLOWUP-2.3] Fix missing sameOutput in branch-2.3 URL: https://github.com/apache/spark/pull/2 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index fedef68bf8513..503e20490a92c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -99,7 +99,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - if (p.sameOutput(reordered)) { + if (sameOutput(p, reordered)) { reordered } else { // Reordering the joins have changed the order of the columns. @@ -107,6 +107,21 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { Project(p.output, reordered) } } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +val output1 = plan1.output +val output2 = plan2.output +output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) +} + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index c8a4b6da4fcd0..9526cbca77094 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -300,8 +300,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { val optimized = Optimize.execute(analyzed) val expected = groundTruthBestPlan.analyze -assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect -assert(analyzed.sameOutput(optimized)) +assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect +assert(sameOutput(analyzed, optimized)) compareJoinOrder(optimized, expected) } @@ -309,4 +309,19 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = { plans.map(_.output).reduce(_ ++ _) } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +val output1 = plan1.output +val output2 = plan2.output +output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) +} + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > join reordering should not change the order of output attributes > > > Key: SPARK-26352 > URL: https://issues.apache.org/jira/browse/SPARK-26352 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Kris Mok >Assignee: Kris Mok >Priority: Major > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} > performs join reordering on inner joins. This was introduced from SPARK-12032 > in 2015-12. > After it had reordered the joins, though, it didn't check whether or not the > column order (in terms of the {{output}} attribute list) is still the same as > before. Thus, it's possible to
[jira] [Updated] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] T M updated SPARK-26385: Summary: YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache (was: YARN - Spark Stateful Structured streaming ) > YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in > cache > --- > > Key: SPARK-26385 > URL: https://issues.apache.org/jira/browse/SPARK-26385 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Hadoop 2.6.0, Spark 2.4.0 >Reporter: T M >Priority: Major > > > Hello, > > I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, > Spark 2.4.0). After 25-26 hours, my job stops working with following error: > {code:java} > 2018-12-16 22:35:17 ERROR > org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query > TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = > a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (token for bdcs: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, > realUser=, issueDate=1544903057122, maxDate=1545507857122, > sequenceNumber=10314, masterKeyId=344) can't be found in cache at > org.apache.hadoop.ipc.Client.call(Client.java:1470) at > org.apache.hadoop.ipc.Client.call(Client.java:1401) at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at > org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at > org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at > org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at > org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at >
[jira] [Commented] (SPARK-26352) join reordering should not change the order of output attributes
[ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723039#comment-16723039 ] ASF GitHub Bot commented on SPARK-26352: cloud-fan closed pull request #23330: [SPARK-26352][SQL][FOLLOWUP-2.4] Fix missing sameOutput in branch-2.4 URL: https://github.com/apache/spark/pull/23330 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 0b6471289a471..2feb4720f9f92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -100,7 +100,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { createOrderedJoin(input, conditions) } - if (p.sameOutput(reordered)) { + if (sameOutput(p, reordered)) { reordered } else { // Reordering the joins have changed the order of the columns. @@ -108,6 +108,21 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { Project(p.output, reordered) } } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +val output1 = plan1.output +val output2 = plan2.output +output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) +} + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index c94a8b9e318f6..38a70f0691dd4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -291,8 +291,8 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { val optimized = Optimize.execute(analyzed) val expected = groundTruthBestPlan.analyze -assert(analyzed.sameOutput(expected)) // if this fails, the expected plan itself is incorrect -assert(analyzed.sameOutput(optimized)) +assert(sameOutput(analyzed, expected)) // if this fails, the expected plan itself is incorrect +assert(sameOutput(analyzed, optimized)) compareJoinOrder(optimized, expected) } @@ -300,4 +300,19 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = { plans.map(_.output).reduce(_ ++ _) } + + /** + * Returns true iff output of both plans are semantically the same, ie.: + * - they contain the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + * NOTE: this is copied over from SPARK-25691 from master. + */ + def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { +val output1 = plan1.output +val output2 = plan2.output +output1.length == output2.length && output1.zip(output2).forall { + case (a1, a2) => a1.semanticEquals(a2) +} + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > join reordering should not change the order of output attributes > > > Key: SPARK-26352 > URL: https://issues.apache.org/jira/browse/SPARK-26352 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Kris Mok >Assignee: Kris Mok >Priority: Major > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} > performs join reordering on inner joins. This was introduced from SPARK-12032 > in 2015-12. > After it had reordered the joins, though, it didn't check whether or not the > column order (in terms of the {{output}} attribute list) is still the same as > before. Thus, it's possible
[jira] [Commented] (SPARK-26243) Use java.time API for parsing timestamps and dates from JSON
[ https://issues.apache.org/jira/browse/SPARK-26243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722995#comment-16722995 ] ASF GitHub Bot commented on SPARK-26243: asfgit closed pull request #23329: [SPARK-26243][SQL][followup] fix code style issues in TimestampFormatter.scala URL: https://github.com/apache/spark/pull/23329 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala new file mode 100644 index 0..9e8d51cc65f03 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.time.{Instant, ZoneId} +import java.util.Locale + +import scala.util.Try + +import org.apache.commons.lang3.time.FastDateFormat + +import org.apache.spark.sql.internal.SQLConf + +sealed trait DateFormatter { + def parse(s: String): Int // returns days since epoch + def format(days: Int): String +} + +class Iso8601DateFormatter( +pattern: String, +locale: Locale) extends DateFormatter with DateTimeFormatterHelper { + + private val formatter = buildFormatter(pattern, locale) + private val UTC = ZoneId.of("UTC") + + private def toInstant(s: String): Instant = { +val temporalAccessor = formatter.parse(s) +toInstantWithZoneId(temporalAccessor, UTC) + } + + override def parse(s: String): Int = { +val seconds = toInstant(s).getEpochSecond +val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) +days.toInt + } + + override def format(days: Int): String = { +val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) +formatter.withZone(UTC).format(instant) + } +} + +class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter { + private val format = FastDateFormat.getInstance(pattern, locale) + + override def parse(s: String): Int = { +val milliseconds = format.parse(s).getTime +DateTimeUtils.millisToDays(milliseconds) + } + + override def format(days: Int): String = { +val date = DateTimeUtils.toJavaDate(days) +format.format(date) + } +} + +class LegacyFallbackDateFormatter( +pattern: String, +locale: Locale) extends LegacyDateFormatter(pattern, locale) { + override def parse(s: String): Int = { +Try(super.parse(s)).orElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime)) +}.getOrElse { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + s.toInt +} + } +} + +object DateFormatter { + def apply(format: String, locale: Locale): DateFormatter = { +if (SQLConf.get.legacyTimeParserEnabled) { + new LegacyFallbackDateFormatter(format, locale) +} else { + new Iso8601DateFormatter(format, locale) +} + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala new file mode 100644 index 0..b85101d38d9e6 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the
[jira] [Updated] (SPARK-26385) YARN - Spark Stateful Structured streaming
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] T M updated SPARK-26385: Description: Hello, I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, Spark 2.4.0). After 25-26 hours, my job stops working with following error: {code:java} 2018-12-16 22:35:17 ERROR org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for bdcs: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, realUser=, issueDate=1544903057122, maxDate=1545507857122, sequenceNumber=10314, masterKeyId=344) can't be found in cache at org.apache.hadoop.ipc.Client.call(Client.java:1470) at org.apache.hadoop.ipc.Client.call(Client.java:1401) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code} ^It is important to notice that I tried usual fix for this kind of problems:^ {code:java} --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" {code} was: Hello, I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, Spark 2.4.0). After 25-26 hours, my job
[jira] [Created] (SPARK-26385) YARN - Spark Stateful Structured streaming
T M created SPARK-26385: --- Summary: YARN - Spark Stateful Structured streaming Key: SPARK-26385 URL: https://issues.apache.org/jira/browse/SPARK-26385 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Environment: Hadoop 2.6.0, Spark 2.4.0 Reporter: T M Hello, I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, Spark 2.4.0). After 25-26 hours, my job stops working with following error: {code:java} 2018-12-16 22:35:17 ERROR org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for bdcs: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, realUser=, issueDate=1544903057122, maxDate=1545507857122, sequenceNumber=10314, masterKeyId=344) can't be found in cache at org.apache.hadoop.ipc.Client.call(Client.java:1470) at org.apache.hadoop.ipc.Client.call(Client.java:1401) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code} ^It is important to notice that I tried usual fix for this king of
[jira] [Updated] (SPARK-26318) Deprecate function merge in Row
[ https://issues.apache.org/jira/browse/SPARK-26318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-26318: -- Priority: Trivial (was: Minor) Summary: Deprecate function merge in Row (was: Enhance function merge performance in Row) > Deprecate function merge in Row > --- > > Key: SPARK-26318 > URL: https://issues.apache.org/jira/browse/SPARK-26318 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Liang Li >Priority: Trivial > > Enhance function merge performance in Row > Like do 1 time Row.merge for input > val row1 = Row("name", "work", 2314, "null", 1, ""), it need 108458 > millisecond > After do some enhancement, it only need 24967 millisecond -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26383) NPE when use DataFrameReader.jdbc with wrong URL
clouds created SPARK-26383: -- Summary: NPE when use DataFrameReader.jdbc with wrong URL Key: SPARK-26383 URL: https://issues.apache.org/jira/browse/SPARK-26383 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.0 Reporter: clouds When passing wrong url to jdbc: {code:java} val opts = Map( "url" -> "jdbc:mysql://localhost/db", "dbtable" -> "table", "driver" -> "org.postgresql.Driver" ) var df = spark.read.format("jdbc").options(opts).load {code} It would throw an NPE instead of complaining about connection failed. (Note url and driver not matched here) {code:java} Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:71) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) {code} as [postgresql jdbc driver document|https://jdbc.postgresql.org/development/privateapi/org/postgresql/Driver.html#connect-java.lang.String-java.util.Properties-] saying, The driver should return "null" if it realizes it is the wrong kind of driver to connect to the given URL. while [ConnectionFactory|https://github.com/apache/spark/blob/e743e848484bf7d97e1b4f33ea83f8520ae7da04/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala#L56] would not check if conn is null. {code:java} val conn: Connection = JdbcUtils.createConnectionFactory(options)() {code} and trying to close the conn anyway {code:java} try { ... } finally { conn.close() } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
Maxim Gekk created SPARK-26384: -- Summary: CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled Key: SPARK-26384 URL: https://issues.apache.org/jira/browse/SPARK-26384 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Maxim Gekk Starting from the commit [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] , add logging like in the comment https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: {code:shell} $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true {code} and in the shell: {code:scala} scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") res0: String = true scala> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") scala> spark.read.option("inferSchema", "true").option("header", "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is being used root |-- _c0: timestamp (nullable = true) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26382) prefix sorter should handle -0.0
[ https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26382: Assignee: Apache Spark (was: Wenchen Fan) > prefix sorter should handle -0.0 > > > Key: SPARK-26382 > URL: https://issues.apache.org/jira/browse/SPARK-26382 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26382) prefix sorter should handle -0.0
[ https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26382: Assignee: Wenchen Fan (was: Apache Spark) > prefix sorter should handle -0.0 > > > Key: SPARK-26382 > URL: https://issues.apache.org/jira/browse/SPARK-26382 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26382) prefix sorter should handle -0.0
[ https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722791#comment-16722791 ] ASF GitHub Bot commented on SPARK-26382: cloud-fan opened a new pull request #23334: [SPARK-26382][] prefix sorter should handle -0.0 URL: https://github.com/apache/spark/pull/23334 ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > prefix sorter should handle -0.0 > > > Key: SPARK-26382 > URL: https://issues.apache.org/jira/browse/SPARK-26382 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26382) prefix sorter should handle -0.0
Wenchen Fan created SPARK-26382: --- Summary: prefix sorter should handle -0.0 Key: SPARK-26382 URL: https://issues.apache.org/jira/browse/SPARK-26382 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26381) Pickle Serialization Error Causing Crash
[ https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan updated SPARK-26381: - Environment: Tested on two environments: * Spark 2.4.0 - single machine only * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS The error occurs in both environments. was: Tested on two environments: * Spark 2.4.0 - single machine only * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS The error occurs in both environments. > Pickle Serialization Error Causing Crash > > > Key: SPARK-26381 > URL: https://issues.apache.org/jira/browse/SPARK-26381 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.1, 2.4.0 > Environment: Tested on two environments: > * Spark 2.4.0 - single machine only > * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS > The error occurs in both environments. >Reporter: Ryan >Priority: Critical > > There is a pickle serialization error when I try and use AllenNLP for doing > NER within a Spark worker - it is causing a crash. When running on just the > Spark driver or in a standalone program, everything works as expected. > > {code:java} > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 217, in main > func, profiler, deserializer, serializer = read_command(pickleSer, infile) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 61, in read_command > command = serializer.loads(command.value) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py", > line 559, in loads > return pickle.loads(obj, encoding=encoding) > TypeError: __init__() missing 3 required positional arguments: > 'non_padded_namespaces', 'padding_token', and 'oov_token' > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) > > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at > org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > ... 1 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26381) Pickle Serialization Error Causing Crash
Ryan created SPARK-26381: Summary: Pickle Serialization Error Causing Crash Key: SPARK-26381 URL: https://issues.apache.org/jira/browse/SPARK-26381 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.0, 2.3.1 Environment: Tested on two environments: * Spark 2.4.0 - single machine only * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS The error occurs in both environments. Reporter: Ryan There is a pickle serialization error when I try and use AllenNLP for doing NER within a Spark worker - it is causing a crash. When running on just the Spark driver or in a standalone program, everything works as expected. {code:java} Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", line 217, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", line 61, in read_command command = serializer.loads(command.value) File "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py", line 559, in loads return pickle.loads(obj, encoding=encoding) TypeError: __init__() missing 3 required positional arguments: 'non_padded_namespaces', 'padding_token', and 'oov_token' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26380) ExternalShuffleBlockResolver should not block shuffle managers by name.
[ https://issues.apache.org/jira/browse/SPARK-26380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peiyu Zhuang updated SPARK-26380: - Description: We configure a shuffle manager that is compatible with the original Spark shuffle manager (but with other network/storage implementation) in our environment. It doesn't require any modification to Spark source code and It works fine until we enable dynamic allocation. The problem is that in {{ExternalShuffleBlockResolver}} class it checks the name of the shuffle manager class and raise an {{UnsupportedOperationException}} if shuffle manager is not {{SortShuffleManager}} or {{UnsafeShuffleManager}}. Since shuffle manager is totally configurable, I think user should be able to decide which shuffle manager to use when dynamic allocation is enabled. Maybe we could change {{UnsupportedOperationException}} to a warning log telling user that the shuffle manager they are using may not be compatible with dynamic allocation and give them detail reasons why. On the other hand, user may still choose to use their own shuffle manager while knowing the risk behind it. was: We configure a shuffle manager that is compatible with the original Spark shuffle manager (but with other network/storage implementation) in our environment. It doesn't require any modification to Spark source code and It works fine until we enable dynamic allocation. The problem is that in {{ExternalShuffleBlockResolver}} class it checks the name of the shuffle manager class and raise an {{UnsupportedOperationException}} if shuffle manager is not {{SortShuffleManager}} or {{UnsafeShuffleManager}}. Since shuffle manager is totally configurable, I think user should be able to decide which shuffle manager to use when dynamic allocation is enabled. Maybe we could change {{UnsupportedOperationException}} to a warning log telling user that the shuffle manager they are using may not be compatible with dynamic allocation and gives them detail reasons why. On the other hand, user may still choose to use their own shuffle manager while knowing the risk behind it. > ExternalShuffleBlockResolver should not block shuffle managers by name. > --- > > Key: SPARK-26380 > URL: https://issues.apache.org/jira/browse/SPARK-26380 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 2.3.2 >Reporter: Peiyu Zhuang >Priority: Major > > We configure a shuffle manager that is compatible with the original Spark > shuffle manager (but with other network/storage implementation) in our > environment. It doesn't require any modification to Spark source code and It > works fine until we enable dynamic allocation. The problem is that in > {{ExternalShuffleBlockResolver}} class it checks the name of the shuffle > manager class and raise an {{UnsupportedOperationException}} if shuffle > manager is not {{SortShuffleManager}} or {{UnsafeShuffleManager}}. > Since shuffle manager is totally configurable, I think user should be able to > decide which shuffle manager to use when dynamic allocation is enabled. > Maybe we could change {{UnsupportedOperationException}} to a warning log > telling user that the shuffle manager they are using may not be compatible > with dynamic allocation and give them detail reasons why. On the other hand, > user may still choose to use their own shuffle manager while knowing the risk > behind it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26352) join reordering should not change the order of output attributes
[ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722779#comment-16722779 ] ASF GitHub Bot commented on SPARK-26352: rednaxelafx opened a new pull request #2: [SPARK-26352][SQL][FOLLOWUP-2.3] Fix missing sameOutput in branch-2.3 URL: https://github.com/apache/spark/pull/2 ## What changes were proposed in this pull request? This is the branch-2.3 equivalent of https://github.com/apache/spark/pull/23330. After https://github.com/apache/spark/pull/23303 was merged to branch-2.3/2.4, the builds on those branches were broken due to missing a `LogicalPlan.sameOutput` function which came from https://github.com/apache/spark/pull/22713 only available on master. This PR is to follow-up with the broken 2.3/2.4 branches and make a copy of the new `LogicalPlan.sameOutput` into `ReorderJoin` to make it locally available. ## How was this patch tested? Fix the build of 2.3/2.4. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > join reordering should not change the order of output attributes > > > Key: SPARK-26352 > URL: https://issues.apache.org/jira/browse/SPARK-26352 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Kris Mok >Assignee: Kris Mok >Priority: Major > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} > performs join reordering on inner joins. This was introduced from SPARK-12032 > in 2015-12. > After it had reordered the joins, though, it didn't check whether or not the > column order (in terms of the {{output}} attribute list) is still the same as > before. Thus, it's possible to have a mismatch between the reordered column > order vs the schema that a DataFrame thinks it has. > This can be demonstrated with the example: > {code:none} > spark.sql("create table table_a (x int, y int) using parquet") > spark.sql("create table table_b (i int, j int) using parquet") > spark.sql("create table table_c (a int, b int) using parquet") > val df = spark.sql("with df1 as (select * from table_a cross join table_b) > select * from df1 join table_c on a = x and b = i") > {code} > here's what the DataFrame thinks: > {code:none} > scala> df.printSchema > root > |-- x: integer (nullable = true) > |-- y: integer (nullable = true) > |-- i: integer (nullable = true) > |-- j: integer (nullable = true) > |-- a: integer (nullable = true) > |-- b: integer (nullable = true) > {code} > here's what the optimized plan thinks, after join reordering: > {code:none} > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- a: integer > |-- b: integer > |-- i: integer > |-- j: integer > {code} > If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule > exclusion feature), it's back to normal: > {code:none} > scala> spark.conf.set("spark.sql.optimizer.excludedRules", > "org.apache.spark.sql.catalyst.optimizer.ReorderJoin") > scala> val df = spark.sql("with df1 as (select * from table_a cross join > table_b) select * from df1 join table_c on a = x and b = i") > df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields] > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- i: integer > |-- j: integer > |-- a: integer > |-- b: integer > {code} > Note that this column ordering problem leads to data corruption, and can > manifest itself in various symptoms: > * Silently corrupting data, if the reordered columns happen to either have > matching types or have sufficiently-compatible types (e.g. all fixed length > primitive types are considered as "sufficiently compatible" in an UnsafeRow), > then only the resulting data is going to be wrong but it might not trigger > any alarms immediately. Or > * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, > or even SIGSEGVs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26380) ExternalShuffleBlockResolver should not block shuffle managers by name.
Peiyu Zhuang created SPARK-26380: Summary: ExternalShuffleBlockResolver should not block shuffle managers by name. Key: SPARK-26380 URL: https://issues.apache.org/jira/browse/SPARK-26380 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.3.2 Reporter: Peiyu Zhuang We configure a shuffle manager that is compatible with the original Spark shuffle manager (but with other network/storage implementation) in our environment. It doesn't require any modification to Spark source code and It works fine until we enable dynamic allocation. The problem is that in {{ExternalShuffleBlockResolver}} class it checks the name of the shuffle manager class and raise an {{UnsupportedOperationException}} if shuffle manager is not {{SortShuffleManager}} or {{UnsafeShuffleManager}}. Since shuffle manager is totally configurable, I think user should be able to decide which shuffle manager to use when dynamic allocation is enabled. Maybe we could change {{UnsupportedOperationException}} to a warning log telling user that the shuffle manager they are using may not be compatible with dynamic allocation and gives them detail reasons why. On the other hand, user may still choose to use their own shuffle manager while knowing the risk behind it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20636) Eliminate unnecessary shuffle with adjacent Window expressions
[ https://issues.apache.org/jira/browse/SPARK-20636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722775#comment-16722775 ] ASF GitHub Bot commented on SPARK-20636: asfgit closed pull request #23222: [SPARK-20636] Add the rule TransposeWindow to the optimization batch URL: https://github.com/apache/spark/pull/23222 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f615757a837a1..3eb6bca6ec976 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -73,6 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) CombineLimits, CombineUnions, // Constant folding and strength reduction +TransposeWindow, NullPropagation, ConstantPropagation, FoldablePropagation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 9a5d5a9966ab7..9277dc6859247 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql import org.scalatest.Matchers.the import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled} +import org.apache.spark.sql.catalyst.optimizer.TransposeWindow +import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -668,18 +670,30 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { ("S2", "P2", 300) ).toDF("sno", "pno", "qty") -val w1 = Window.partitionBy("sno") -val w2 = Window.partitionBy("sno", "pno") - -checkAnswer( - df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2")) -.select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1")), - Seq( -Row("S1", "P1", 100, 800, 800), -Row("S1", "P1", 700, 800, 800), -Row("S2", "P1", 200, 200, 500), -Row("S2", "P2", 300, 300, 500))) - +Seq(true, false).foreach { transposeWindowEnabled => + val excludedRules = if (transposeWindowEnabled) "" else TransposeWindow.ruleName + withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> excludedRules) { +val w1 = Window.partitionBy("sno") +val w2 = Window.partitionBy("sno", "pno") + +val select = df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2")) + .select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1")) + +val expectedNumExchanges = if (transposeWindowEnabled) 1 else 2 +val actualNumExchanges = select.queryExecution.executedPlan.collect { + case e: Exchange => e +}.length +assert(actualNumExchanges == expectedNumExchanges) + +checkAnswer( + select, + Seq( +Row("S1", "P1", 100, 800, 800), +Row("S1", "P1", 700, 800, 800), +Row("S2", "P1", 200, 200, 500), +Row("S2", "P2", 300, 300, 500))) + } +} } test("NaN and -0.0 in window partition keys") { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Eliminate unnecessary shuffle with adjacent Window expressions > -- > > Key: SPARK-20636 > URL: https://issues.apache.org/jira/browse/SPARK-20636 > Project: Spark > Issue Type: Improvement > Components: Optimizer >Affects Versions: 2.1.1 >Reporter: Michael Styles >Assignee: Michael Styles >Priority: Major > Fix For: 3.0.0 > > > Consider the following example: > {noformat} > w1 = Window.partitionBy("sno") > w2 = Window.partitionBy("sno", "pno") > supply \ > .select('sno', 'pno', 'qty',