[spark] branch master updated: [SPARK-27243][SQL] RuleExecutor.dumpTimeSpent should not throw exception when empty
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fe317dc [SPARK-27243][SQL] RuleExecutor.dumpTimeSpent should not throw exception when empty fe317dc is described below commit fe317dc74e5fa1509ae9d735485f66724f7292e5 Author: Marco Gaido AuthorDate: Sat Mar 23 09:49:20 2019 +0900 [SPARK-27243][SQL] RuleExecutor.dumpTimeSpent should not throw exception when empty ## What changes were proposed in this pull request? `RuleExecutor.dumpTimeSpent` currently throws an exception when invoked before any rule is run or immediately after `RuleExecutor.reset`. The PR makes it returning an empty summary, which is the expected output instead. ## How was this patch tested? added UT Closes #24180 from mgaido91/SPARK-27243. Authored-by: Marco Gaido Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala| 6 +- .../org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala | 6 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala index e4d5fa9..7a86433 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala @@ -64,7 +64,11 @@ case class QueryExecutionMetering() { /** Dump statistics about time spent running specific rules. */ def dumpTimeSpent(): String = { val map = timeMap.asMap().asScala -val maxLengthRuleNames = map.keys.map(_.toString.length).max +val maxLengthRuleNames = if (map.isEmpty) { + 0 +} else { + map.keys.map(_.toString.length).max +} val colRuleName = "Rule".padTo(maxLengthRuleNames, " ").mkString val colRunTime = "Effective Time / Total Time".padTo(len = 47, " ").mkString diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index ab5d722..8dbe198 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -91,4 +91,10 @@ class RuleExecutorSuite extends SparkFunSuite { }.getMessage assert(message.contains("the structural integrity of the plan is broken")) } + + test("SPARK-27243: dumpTimeSpent when no rule has run") { +RuleExecutor.resetMetrics() +// This should not throw an exception +RuleExecutor.dumpTimeSpent() + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27184][CORE] Avoid hardcoded 'spark.jars', 'spark.files', 'spark.submit.pyFiles' and 'spark.submit.deployMode'
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 68abf77 [SPARK-27184][CORE] Avoid hardcoded 'spark.jars', 'spark.files', 'spark.submit.pyFiles' and 'spark.submit.deployMode' 68abf77 is described below commit 68abf77b1ad8da7916a9dc5fa8bb350b64479410 Author: hehuiyuan AuthorDate: Sat Mar 23 09:43:00 2019 +0900 [SPARK-27184][CORE] Avoid hardcoded 'spark.jars', 'spark.files', 'spark.submit.pyFiles' and 'spark.submit.deployMode' ## What changes were proposed in this pull request? For [SPARK-27184](https://issues.apache.org/jira/browse/SPARK-27184) In the `org.apache.spark.internal.config`, we define the variables of `FILES` and `JARS`, we can use them instead of "spark.jars" and "spark.files". ```scala private[spark] val JARS = ConfigBuilder("spark.jars") .stringConf .toSequence .createWithDefault(Nil) ``` ```scala private[spark] val FILES = ConfigBuilder("spark.files") .stringConf .toSequence .createWithDefault(Nil) ``` Other : In the `org.apache.spark.internal.config`, we define the variables of `SUBMIT_PYTHON_FILES ` and `SUBMIT_DEPLOY_MODE `, we can use them instead of "spark.submit.pyFiles" and "spark.submit.deployMode". ```scala private[spark] val SUBMIT_PYTHON_FILES = ConfigBuilder("spark.submit.pyFiles") .stringConf .toSequence .createWithDefault(Nil) ``` ```scala private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode") .stringConf .createWithDefault("client") ``` Closes #24123 from hehuiyuan/hehuiyuan-patch-6. Authored-by: hehuiyuan Signed-off-by: Hyukjin Kwon --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 8 .../src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5cd6c2b..4abb18d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -394,7 +394,7 @@ class SparkContext(config: SparkConf) extends Logging { _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) -_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) +_files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten _eventLogDir = diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b6673e4..b4d7462 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -542,10 +542,10 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = CORES_MAX.key), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, -confKey = "spark.files"), - OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), +confKey = FILES.key), + OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, -confKey = "spark.jars"), +confKey = JARS.key), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f8c5330..e7954d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -183,9 +183,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orElse(sparkProperties.get(config.CORES_MAX.key)) .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull -jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull -files = Option(files).orElse(sparkProperties.get("spark.files")).orNull -pyFiles = Option(pyFiles).orElse(sparkProperties.get("spark.submit.pyFiles")).orNull +
[spark] branch branch-2.3 updated: [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 978b68a [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client 978b68a is described below commit 978b68a35d23c094fd005a1fb6e5ebc10e33f8d0 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri Mar 22 15:07:49 2019 -0700 [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client This patch fixes the issue that ClientEndpoint in standalone cluster doesn't recognize about driver options which are passed to SparkConf instead of system properties. When `Client` is executed via cli they should be provided as system properties, but with `spark-submit` they can be provided as SparkConf. (SpartSubmit will call `ClientApp.start` with SparkConf which would contain these options.) Manually tested via following steps: 1) setup standalone cluster (launch master and worker via `./sbin/start-all.sh`) 2) submit one of example app with standalone cluster mode ``` ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master "spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" --deploy-mode "cluster" --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10 ``` 3) check whether `foo=BAR` is provided in system properties in Spark UI https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png;> Closes #24163 from HeartSaVioR/SPARK-26606. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin (cherry picked from commit 8a9eb05137cd4c665f39a54c30d46c0c4eb7d20b) Signed-off-by: Marcelo Vanzin --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index d514509..708910b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -60,6 +60,10 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private def getProperty(key: String, conf: SparkConf): Option[String] = { +sys.props.get(key).orElse(conf.getOption(key)) + } + override def onStart(): Unit = { driverArgs.cmd match { case "launch" => @@ -69,18 +73,19 @@ private class ClientEndpoint( val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" -val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => +val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathConf = "spark.driver.extraLibraryPath" -val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => +val libraryPathEntries = getProperty(libraryPathConf, conf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val extraJavaOptsConf = "spark.driver.extraJavaOptions" -val extraJavaOpts = sys.props.get(extraJavaOptsConf) +val extraJavaOpts = getProperty(extraJavaOptsConf, conf) .map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 6f1a8d8 [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client 6f1a8d8 is described below commit 6f1a8d8bfdd8dccc9af2d144ea5ad644ddc63a81 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri Mar 22 15:07:49 2019 -0700 [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client This patch fixes the issue that ClientEndpoint in standalone cluster doesn't recognize about driver options which are passed to SparkConf instead of system properties. When `Client` is executed via cli they should be provided as system properties, but with `spark-submit` they can be provided as SparkConf. (SpartSubmit will call `ClientApp.start` with SparkConf which would contain these options.) Manually tested via following steps: 1) setup standalone cluster (launch master and worker via `./sbin/start-all.sh`) 2) submit one of example app with standalone cluster mode ``` ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master "spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" --deploy-mode "cluster" --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10 ``` 3) check whether `foo=BAR` is provided in system properties in Spark UI https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png;> Closes #24163 from HeartSaVioR/SPARK-26606. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin (cherry picked from commit 8a9eb05137cd4c665f39a54c30d46c0c4eb7d20b) Signed-off-by: Marcelo Vanzin --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index d514509..708910b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -60,6 +60,10 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private def getProperty(key: String, conf: SparkConf): Option[String] = { +sys.props.get(key).orElse(conf.getOption(key)) + } + override def onStart(): Unit = { driverArgs.cmd match { case "launch" => @@ -69,18 +73,19 @@ private class ClientEndpoint( val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" -val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => +val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathConf = "spark.driver.extraLibraryPath" -val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => +val libraryPathEntries = getProperty(libraryPathConf, conf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val extraJavaOptsConf = "spark.driver.extraJavaOptions" -val extraJavaOpts = sys.props.get(extraJavaOptsConf) +val extraJavaOpts = getProperty(extraJavaOptsConf, conf) .map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8a9eb05 [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client 8a9eb05 is described below commit 8a9eb05137cd4c665f39a54c30d46c0c4eb7d20b Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri Mar 22 15:07:49 2019 -0700 [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client ## What changes were proposed in this pull request? This patch fixes the issue that ClientEndpoint in standalone cluster doesn't recognize about driver options which are passed to SparkConf instead of system properties. When `Client` is executed via cli they should be provided as system properties, but with `spark-submit` they can be provided as SparkConf. (SpartSubmit will call `ClientApp.start` with SparkConf which would contain these options.) ## How was this patch tested? Manually tested via following steps: 1) setup standalone cluster (launch master and worker via `./sbin/start-all.sh`) 2) submit one of example app with standalone cluster mode ``` ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master "spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" --deploy-mode "cluster" --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10 ``` 3) check whether `foo=BAR` is provided in system properties in Spark UI https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png;> Closes #24163 from HeartSaVioR/SPARK-26606. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index e65a494..ea7c902 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -61,6 +61,10 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private def getProperty(key: String, conf: SparkConf): Option[String] = { +sys.props.get(key).orElse(conf.getOption(key)) + } + override def onStart(): Unit = { driverArgs.cmd match { case "launch" => @@ -70,18 +74,19 @@ private class ClientEndpoint( val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = config.DRIVER_CLASS_PATH.key -val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => +val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathConf = config.DRIVER_LIBRARY_PATH.key -val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => +val libraryPathEntries = getProperty(libraryPathConf, conf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key -val extraJavaOpts = sys.props.get(extraJavaOptsConf) +val extraJavaOpts = getProperty(extraJavaOptsConf, conf) .map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 34e3cc7 [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS. 34e3cc7 is described below commit 34e3cc70602b5107ebeea3f99c7c41672107ca13 Author: Ryan Blue AuthorDate: Fri Mar 22 13:58:54 2019 -0700 [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS. ## What changes were proposed in this pull request? This moves parsing `CREATE TABLE ... USING` statements into catalyst. Catalyst produces logical plans with the parsed information and those plans are converted to v1 `DataSource` plans in `DataSourceAnalysis`. This prepares for adding v2 create plans that should receive the information parsed from SQL without being translated to v1 plans first. This also makes it possible to parse in catalyst instead of breaking the parser across the abstract `AstBuilder` in catalyst and `SparkSqlParser` in core. For more information, see the [mailing list thread](https://lists.apache.org/thread.html/54f4e1929ceb9a2b0cac7cb058000feb8de5d6c667b2e0950804c613%3Cdev.spark.apache.org%3E). ## How was this patch tested? This uses existing tests to catch regressions. This introduces no behavior changes. Closes #24029 from rdblue/SPARK-27108-add-parsed-create-logical-plans. Authored-by: Ryan Blue Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/parser/AstBuilder.scala | 192 - .../plans/logical/sql/CreateTableStatement.scala | 66 + .../plans/logical/sql/ParsedStatement.scala| 44 +++ .../spark/sql/catalyst/parser/DDLParserSuite.scala | 318 + .../spark/sql/execution/SparkSqlParser.scala | 233 ++- .../datasources/DataSourceResolution.scala | 112 .../sql/internal/BaseSessionStateBuilder.scala | 1 + .../spark/sql/execution/SparkSqlParserSuite.scala | 13 - .../sql/execution/command/DDLParserSuite.scala | 251 +--- .../execution/command/PlanResolutionSuite.scala| 257 + .../spark/sql/hive/HiveSessionStateBuilder.scala | 1 + 11 files changed, 1030 insertions(+), 458 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 38a61b8..52a5d2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, CreateTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) if (STRING == null) structField else structField.withComment(string(STRING)) } + + /** + * Create location string. + */ + override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { +string(ctx.STRING) + } + + /** + * Create a [[BucketSpec]]. + */ + override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = withOrigin(ctx) { +BucketSpec( + ctx.INTEGER_VALUE.getText.toInt, + visitIdentifierList(ctx.identifierList), + Option(ctx.orderedIdentifierList) + .toSeq + .flatMap(_.orderedIdentifier.asScala) + .map { orderedIdCtx => +Option(orderedIdCtx.ordering).map(_.getText).foreach { dir => + if (dir.toLowerCase(Locale.ROOT) != "asc") { +operationNotAllowed(s"Column ordering must be ASC, was '$dir'", ctx) + } +} + +orderedIdCtx.identifier.getText + }) + } + + /** + * Convert a table property list into a key-value map. + * This should be called
[spark] branch master updated: [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 78d546f [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted 78d546f is described below commit 78d546fe15aebcbf4b671c44383ddcf82b05b8a7 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri Mar 22 11:26:53 2019 -0700 [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted ## What changes were proposed in this pull request? This patch proposes ManifestFileCommitProtocol to clean up incomplete output files in task level if task aborts. Please note that this works as 'best-effort', not kind of guarantee, as we have in HadoopMapReduceCommitProtocol. ## How was this patch tested? Added UT. Closes #24154 from HeartSaVioR/SPARK-27210. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu --- .../streaming/ManifestFileCommitProtocol.scala | 7 -- .../spark/sql/streaming/FileStreamSinkSuite.scala | 29 ++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 92191c8..916bd2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } override def abortTask(taskContext: TaskAttemptContext): Unit = { -// Do nothing -// TODO: we can also try delete the addedFiles as a best-effort cleanup. +// best effort cleanup of incomplete files +if (addedFiles.nonEmpty) { + val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration) + addedFiles.foreach { file => fs.delete(new Path(file), false) } +} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 619d118..020ab23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql.streaming import java.io.File +import java.nio.file.Files import java.util.Locale +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} @@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest { checkDatasetUnorderly(outputDf, 1, 2, 3) } } + + testQuietly("cleanup incomplete output for aborted task") { +withTempDir { tempDir => + val checkpointDir = new File(tempDir, "chk") + val outputDir = new File(tempDir, "output") + val inputData = MemoryStream[Int] + inputData.addData(1, 2, 3) + val q = inputData.toDS().map(_ / 0) +.writeStream +.option("checkpointLocation", checkpointDir.getCanonicalPath) +.format("parquet") +.start(outputDir.getCanonicalPath) + + intercept[StreamingQueryException] { +try { + q.processAllAvailable() +} finally { + q.stop() +} + } + + val outputFiles = Files.walk(outputDir.toPath).iterator().asScala +.filter(_.toString.endsWith(".parquet")) + assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned up.") +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-27174][SQL] Add support for casting integer types to binary
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8efc5ec [SPARK-27174][SQL] Add support for casting integer types to binary 8efc5ec is described below commit 8efc5ec72e2f5899547941010e22c023d6cb86b3 Author: Martin Junghanns AuthorDate: Fri Mar 22 10:09:35 2019 -0700 [SPARK-27174][SQL] Add support for casting integer types to binary Co-authored-by: Philip Stutz ## What changes were proposed in this pull request? This PR adds support for casting * `ByteType` * `ShortType` * `IntegerType` * `LongType` to `BinaryType`. ## How was this patch tested? We added unit tests for casting instances of the above types. For validation, we used Javas `DataOutputStream` to compare the resulting byte array with the result of `Cast`. We state that the contribution is our original work and that we license the work to the project under the project’s open source license. cloud-fan we'd appreciate a review if you find the time, thx Closes #24107 from s1ck/cast_to_binary. Authored-by: Martin Junghanns Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/Cast.scala | 11 +- .../spark/sql/catalyst/util/NumberConverter.scala | 35 ++ .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 5 - .../sql/catalyst/util/NumberConverterSuite.scala | 48 - .../src/test/resources/sql-tests/inputs/cast.sql | 13 +++ .../test/resources/sql-tests/results/cast.sql.out | 84 +-- .../typeCoercion/native/binaryComparison.sql.out | 120 + .../native/windowFrameCoercion.sql.out | 2 +- .../scala/org/apache/spark/sql/DatasetSuite.scala | 4 +- 9 files changed, 234 insertions(+), 88 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 72cb6b2..848195f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -44,6 +44,7 @@ object Cast { case (_, StringType) => true case (StringType, BinaryType) => true +case (_: IntegralType, BinaryType) => true case (StringType, BooleanType) => true case (DateType, BooleanType) => true @@ -326,6 +327,10 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String // BinaryConverter private[this] def castToBinary(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, _.getBytes) +case ByteType => buildCast[Byte](_, NumberConverter.toBinary) +case ShortType => buildCast[Short](_, NumberConverter.toBinary) +case IntegerType => buildCast[Int](_, NumberConverter.toBinary) +case LongType => buildCast[Long](_, NumberConverter.toBinary) } // UDFToBoolean @@ -908,7 +913,11 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String private[this] def castToBinaryCode(from: DataType): CastFunction = from match { case StringType => - (c, evPrim, evNull) => code"$evPrim = $c.getBytes();" + (c, evPrim, evNull) => +code"$evPrim = $c.getBytes();" +case _: IntegralType => + (c, evPrim, evNull) => +code"$evPrim = ${NumberConverter.getClass.getName.stripSuffix("$")}.toBinary($c);" } private[this] def castToDateCode( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala index 9c3f6b7..7dbdd1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala @@ -169,4 +169,39 @@ object NumberConverter { } UTF8String.fromBytes(java.util.Arrays.copyOfRange(temp, resultStartPos, temp.length)) } + + def toBinary(l: Long): Array[Byte] = { +val result = new Array[Byte](8) +result(0) = (l >>> 56 & 0xFF).toByte +result(1) = (l >>> 48 & 0xFF).toByte +result(2) = (l >>> 40 & 0xFF).toByte +result(3) = (l >>> 32 & 0xFF).toByte +result(4) = (l >>> 24 & 0xFF).toByte +result(5) = (l >>> 16 & 0xFF).toByte +result(6) = (l >>> 8 & 0xFF).toByte +result(7) = (l & 0xFF).toByte +result + } + + def toBinary(i: Int): Array[Byte] = { +val result = new Array[Byte](4) +result(0) = (i >>> 24 & 0xFF).toByte +result(1) = (i >>> 16 & 0xFF).toByte +result(2) = (i >>> 8 & 0xFF).toByte +result(3) = (i & 0xFF).toByte +result + } + + def toBinary(s:
[spark] branch master updated: [SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for Yarn
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8204dc1 [SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for Yarn 8204dc1 is described below commit 8204dc1e548b87aabaf36c5800592bafd44e4419 Author: 10087686 AuthorDate: Fri Mar 22 05:29:29 2019 -0500 [SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for Yarn ## What changes were proposed in this pull request? There is some hardcode configs in code, I think it best to modify。 ## How was this patch tested? Existing tests Closes #24103 from wangjiaochun/yarnHardCode. Authored-by: 10087686 Signed-off-by: Sean Owen --- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++-- .../spark/deploy/yarn/ApplicationMasterSuite.scala | 3 ++- .../spark/deploy/yarn/BaseYarnClusterSuite.scala | 2 +- .../apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 17 + .../apache/spark/deploy/yarn/YarnClusterSuite.scala| 18 +- .../spark/network/yarn/YarnShuffleServiceSuite.scala | 3 ++- 6 files changed, 25 insertions(+), 22 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 743c2e0..e4b6b3d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -296,8 +296,8 @@ private[spark] class ApplicationMaster( Option(appAttemptId.getApplicationId.toString), None).setCurrentContext() val driverRef = clientRpcEnv.setupEndpointRef( -RpcAddress(sparkConf.get("spark.driver.host"), - sparkConf.get("spark.driver.port").toInt), +RpcAddress(sparkConf.get(DRIVER_HOST_ADDRESS), + sparkConf.get(DRIVER_PORT)), YarnSchedulerBackend.ENDPOINT_NAME) // The client-mode AM doesn't listen for incoming connections, so report an invalid port. registerAM(Utils.localHostName, -1, sparkConf, diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala index 695a82f..d9bdace 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.yarn.config._ class ApplicationMasterSuite extends SparkFunSuite { @@ -28,7 +29,7 @@ class ApplicationMasterSuite extends SparkFunSuite { val port = 18080 val sparkConf = new SparkConf() -sparkConf.set("spark.yarn.historyServer.address", +sparkConf.set(HISTORY_SERVER_ADDRESS, "http://${hadoopconf-yarn.resourcemanager.hostname}:${spark.history.ui.port};) val yarnConf = new YarnConfiguration() yarnConf.set("yarn.resourcemanager.hostname", host) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 49367e0..b9aeb1c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -149,7 +149,7 @@ abstract class BaseYarnClusterSuite launcher.setSparkHome(sys.props("spark.test.home")) .setMaster("yarn") .setDeployMode(deployMode) - .setConf("spark.executor.instances", "1") + .setConf(EXECUTOR_INSTANCES.key, "1") .setPropertiesFile(propsFile) .addAppArgs(appArgs.toArray: _*) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 53a538d..42b5966 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ import
[spark] branch master updated: [MINOR][CORE] Leverage modified Utils.classForName to reduce scalastyle off for Class.forName
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 174531c [MINOR][CORE] Leverage modified Utils.classForName to reduce scalastyle off for Class.forName 174531c is described below commit 174531c183d058c6f92330ef1780e5a5c03d34f0 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Fri Mar 22 05:28:46 2019 -0500 [MINOR][CORE] Leverage modified Utils.classForName to reduce scalastyle off for Class.forName ## What changes were proposed in this pull request? This patch modifies Utils.classForName to have optional parameters - initialize, noSparkClassLoader - to let callers of Class.forName with thread context classloader to use it instead. This helps to reduce scalastyle off for Class.forName. ## How was this patch tested? Existing UTs. Closes #24148 from HeartSaVioR/MINOR-reduce-scalastyle-off-for-class-forname. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Sean Owen --- .../apache/spark/serializer/KryoSerializer.scala | 35 ++ .../org/apache/spark/util/ClosureCleaner.scala | 14 +++-- .../main/scala/org/apache/spark/util/Utils.scala | 20 + .../test/scala/org/apache/spark/FileSuite.scala| 15 +++--- .../KryoSerializerDistributedSuite.scala | 6 ++-- .../spark/util/MutableURLClassLoaderSuite.scala| 5 +--- 6 files changed, 41 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 2df133d..eef1997 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -130,7 +130,6 @@ class KryoSerializer(conf: SparkConf) val kryo = instantiator.newKryo() kryo.setRegistrationRequired(registrationRequired) -val oldClassLoader = Thread.currentThread.getContextClassLoader val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader) // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. @@ -156,24 +155,22 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) -try { - // scalastyle:off classforname - // Use the default classloader when calling the user registrator. - Thread.currentThread.setContextClassLoader(classLoader) - // Register classes given through spark.kryo.classesToRegister. - classesToRegister -.foreach { className => kryo.register(Class.forName(className, true, classLoader)) } - // Allow the user to register their own classes by setting spark.kryo.registrator. - userRegistrators -.map(Class.forName(_, true, classLoader).getConstructor(). - newInstance().asInstanceOf[KryoRegistrator]) -.foreach { reg => reg.registerClasses(kryo) } - // scalastyle:on classforname -} catch { - case e: Exception => -throw new SparkException(s"Failed to register classes with Kryo", e) -} finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) +// Use the default classloader when calling the user registrator. +Utils.withContextClassLoader(classLoader) { + try { +// Register classes given through spark.kryo.classesToRegister. +classesToRegister.foreach { className => + kryo.register(Utils.classForName(className, noSparkClassLoader = true)) +} +// Allow the user to register their own classes by setting spark.kryo.registrator. +userRegistrators + .map(Utils.classForName(_, noSparkClassLoader = true).getConstructor(). +newInstance().asInstanceOf[KryoRegistrator]) + .foreach { reg => reg.registerClasses(kryo) } + } catch { +case e: Exception => + throw new SparkException(s"Failed to register classes with Kryo", e) + } } // Register Chill's classes; we do this after our ranges and the user's own classes to let diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 1b3e525..5f725d8 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -378,10 +378,8 @@ private[spark] object ClosureCleaner extends Logging { } else { logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}") - // scalastyle:off classforname - val captClass =
[spark] branch master updated: [SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in stringToTimestamp
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a529be2 [SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in stringToTimestamp a529be2 is described below commit a529be2930b1d69015f1ac8f85e590f197cf53cf Author: Maxim Gekk AuthorDate: Fri Mar 22 18:01:29 2019 +0900 [SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in stringToTimestamp ## What changes were proposed in this pull request? In the PR, I propose to avoid the `TimeZone` to `ZoneId` conversion in `DateTimeUtils.stringToTimestamp` by changing signature of the method, and require a parameter of `ZoneId` type. This will allow to avoid unnecessary conversion (`TimeZone` -> `String` -> `ZoneId`) per each row. Also the PR avoids creation of `ZoneId` instances from `ZoneOffset` because `ZoneOffset` is a sub-class, and the conversion is unnecessary too. ## How was this patch tested? It was tested by `DateTimeUtilsSuite` and `CastSuite`. Closes #24155 from MaxGekk/stringtotimestamp-zoneid. Authored-by: Maxim Gekk Signed-off-by: Hyukjin Kwon --- .../spark/sql/catalyst/expressions/Cast.scala | 6 +-- .../spark/sql/catalyst/parser/AstBuilder.scala | 6 +-- .../spark/sql/catalyst/util/DateTimeUtils.scala| 8 ++-- .../expressions/HashExpressionsSuite.scala | 8 ++-- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 46 +++--- .../spark/sql/catalyst/util/UnsafeArraySuite.scala | 9 - .../execution/datasources/jdbc/JDBCRelation.scala | 4 +- 7 files changed, 47 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index a70ed6d..72cb6b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -364,7 +364,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String // TimestampConverter private[this] def castToTimestamp(from: DataType): Any => Any = from match { case StringType => - buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, timeZone).orNull) + buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, zoneId).orNull) case BooleanType => buildCast[Boolean](_, b => if (b) 1L else 0) case LongType => @@ -1017,12 +1017,12 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String from: DataType, ctx: CodegenContext): CastFunction = from match { case StringType => - val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) + val zid = ctx.addReferenceObj("zoneId", zoneId, "java.time.ZoneId") val longOpt = ctx.freshVariable("longOpt", classOf[Option[Long]]) (c, evPrim, evNull) => code""" scala.Option $longOpt = - org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $tz); + org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid); if ($longOpt.isDefined()) { $evPrim = ((Long) $longOpt.get()).longValue(); } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b1d6be5..38a61b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -1593,8 +1593,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging valueType match { case "DATE" => toLiteral(stringToDate, DateType) case "TIMESTAMP" => - val timeZone = getTimeZone(SQLConf.get.sessionLocalTimeZone) - toLiteral(stringToTimestamp(_, timeZone), TimestampType) + val zoneId = getZoneId(SQLConf.get.sessionLocalTimeZone) +