[spark] branch master updated: [SPARK-27243][SQL] RuleExecutor.dumpTimeSpent should not throw exception when empty

2019-03-22 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fe317dc  [SPARK-27243][SQL] RuleExecutor.dumpTimeSpent should not 
throw exception when empty
fe317dc is described below

commit fe317dc74e5fa1509ae9d735485f66724f7292e5
Author: Marco Gaido 
AuthorDate: Sat Mar 23 09:49:20 2019 +0900

[SPARK-27243][SQL] RuleExecutor.dumpTimeSpent should not throw exception 
when empty

## What changes were proposed in this pull request?

`RuleExecutor.dumpTimeSpent` currently throws an exception when invoked 
before any rule is run or immediately after `RuleExecutor.reset`. The PR makes 
it returning an empty summary, which is the expected output instead.

## How was this patch tested?

added UT

Closes #24180 from mgaido91/SPARK-27243.

Authored-by: Marco Gaido 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala| 6 +-
 .../org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala | 6 ++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index e4d5fa9..7a86433 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -64,7 +64,11 @@ case class QueryExecutionMetering() {
   /** Dump statistics about time spent running specific rules. */
   def dumpTimeSpent(): String = {
 val map = timeMap.asMap().asScala
-val maxLengthRuleNames = map.keys.map(_.toString.length).max
+val maxLengthRuleNames = if (map.isEmpty) {
+  0
+} else {
+  map.keys.map(_.toString.length).max
+}
 
 val colRuleName = "Rule".padTo(maxLengthRuleNames, " ").mkString
 val colRunTime = "Effective Time / Total Time".padTo(len = 47, " 
").mkString
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala
index ab5d722..8dbe198 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala
@@ -91,4 +91,10 @@ class RuleExecutorSuite extends SparkFunSuite {
 }.getMessage
 assert(message.contains("the structural integrity of the plan is broken"))
   }
+
+  test("SPARK-27243: dumpTimeSpent when no rule has run") {
+RuleExecutor.resetMetrics()
+// This should not throw an exception
+RuleExecutor.dumpTimeSpent()
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27184][CORE] Avoid hardcoded 'spark.jars', 'spark.files', 'spark.submit.pyFiles' and 'spark.submit.deployMode'

2019-03-22 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 68abf77  [SPARK-27184][CORE] Avoid hardcoded 'spark.jars', 
'spark.files', 'spark.submit.pyFiles' and 'spark.submit.deployMode'
68abf77 is described below

commit 68abf77b1ad8da7916a9dc5fa8bb350b64479410
Author: hehuiyuan 
AuthorDate: Sat Mar 23 09:43:00 2019 +0900

[SPARK-27184][CORE] Avoid hardcoded 'spark.jars', 'spark.files', 
'spark.submit.pyFiles' and 'spark.submit.deployMode'

## What changes were proposed in this pull request?

For [SPARK-27184](https://issues.apache.org/jira/browse/SPARK-27184)

In the `org.apache.spark.internal.config`, we define the variables of 
`FILES` and `JARS`, we can use them instead of "spark.jars" and "spark.files".

```scala
private[spark] val JARS = ConfigBuilder("spark.jars")
  .stringConf
  .toSequence
  .createWithDefault(Nil)
```

```scala
private[spark] val FILES = ConfigBuilder("spark.files")
  .stringConf
  .toSequence
  .createWithDefault(Nil)
```

Other :
In the `org.apache.spark.internal.config`, we define the variables of 
`SUBMIT_PYTHON_FILES ` and `SUBMIT_DEPLOY_MODE `, we can use them instead of 
"spark.submit.pyFiles" and "spark.submit.deployMode".
```scala
private[spark] val SUBMIT_PYTHON_FILES = 
ConfigBuilder("spark.submit.pyFiles")
.stringConf
.toSequence
.createWithDefault(Nil)

```
```scala
private[spark] val SUBMIT_DEPLOY_MODE = 
ConfigBuilder("spark.submit.deployMode")
.stringConf
.createWithDefault("client")
```

Closes #24123 from hehuiyuan/hehuiyuan-patch-6.

Authored-by: hehuiyuan 
Signed-off-by: Hyukjin Kwon 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala   | 2 +-
 core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++---
 .../main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 8 
 .../src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +-
 4 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5cd6c2b..4abb18d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -394,7 +394,7 @@ class SparkContext(config: SparkConf) extends Logging {
 _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)
 
 _jars = Utils.getUserJars(_conf)
-_files = 
_conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
+_files = 
_conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
   .toSeq.flatten
 
 _eventLogDir =
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b6673e4..b4d7462 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -542,10 +542,10 @@ private[spark] class SparkSubmit extends Logging {
   OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, 
ALL_DEPLOY_MODES,
 confKey = CORES_MAX.key),
   OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, 
ALL_DEPLOY_MODES,
-confKey = "spark.files"),
-  OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
+confKey = FILES.key),
+  OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
   OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, 
ALL_DEPLOY_MODES,
-confKey = "spark.jars"),
+confKey = JARS.key),
   OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | 
KUBERNETES, CLUSTER,
 confKey = DRIVER_MEMORY.key),
   OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, 
CLUSTER,
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index f8c5330..e7954d1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -183,9 +183,9 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   .orElse(sparkProperties.get(config.CORES_MAX.key))
   .orNull
 name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
-jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
-files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
-pyFiles = 
Option(pyFiles).orElse(sparkProperties.get("spark.submit.pyFiles")).orNull
+ 

[spark] branch branch-2.3 updated: [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client

2019-03-22 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new 978b68a  [SPARK-26606][CORE] Handle driver options properly when 
submitting to standalone cluster mode via legacy Client
978b68a is described below

commit 978b68a35d23c094fd005a1fb6e5ebc10e33f8d0
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Mar 22 15:07:49 2019 -0700

[SPARK-26606][CORE] Handle driver options properly when submitting to 
standalone cluster mode via legacy Client

This patch fixes the issue that ClientEndpoint in standalone cluster 
doesn't recognize about driver options which are passed to SparkConf instead of 
system properties. When `Client` is executed via cli they should be provided as 
system properties, but with `spark-submit` they can be provided as SparkConf. 
(SpartSubmit will call `ClientApp.start` with SparkConf which would contain 
these options.)

Manually tested via following steps:

1) setup standalone cluster (launch master and worker via 
`./sbin/start-all.sh`)

2) submit one of example app with standalone cluster mode

```
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
"spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" 
--deploy-mode "cluster" --num-executors 1 --driver-memory 512m 
--executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10
```

3) check whether `foo=BAR` is provided in system properties in Spark UI

https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png;>

Closes #24163 from HeartSaVioR/SPARK-26606.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Marcelo Vanzin 
(cherry picked from commit 8a9eb05137cd4c665f39a54c30d46c0c4eb7d20b)
Signed-off-by: Marcelo Vanzin 
---
 core/src/main/scala/org/apache/spark/deploy/Client.scala | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d514509..708910b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -60,6 +60,10 @@ private class ClientEndpoint(
private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
 
+  private def getProperty(key: String, conf: SparkConf): Option[String] = {
+sys.props.get(key).orElse(conf.getOption(key))
+  }
+
   override def onStart(): Unit = {
 driverArgs.cmd match {
   case "launch" =>
@@ -69,18 +73,19 @@ private class ClientEndpoint(
 val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
 
 val classPathConf = "spark.driver.extraClassPath"
-val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp 
=>
+val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap 
{ cp =>
   cp.split(java.io.File.pathSeparator)
 }
 
 val libraryPathConf = "spark.driver.extraLibraryPath"
-val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap 
{ cp =>
+val libraryPathEntries = getProperty(libraryPathConf, 
conf).toSeq.flatMap { cp =>
   cp.split(java.io.File.pathSeparator)
 }
 
 val extraJavaOptsConf = "spark.driver.extraJavaOptions"
-val extraJavaOpts = sys.props.get(extraJavaOptsConf)
+val extraJavaOpts = getProperty(extraJavaOptsConf, conf)
   .map(Utils.splitCommandString).getOrElse(Seq.empty)
+
 val sparkJavaOpts = Utils.sparkJavaOpts(conf)
 val javaOpts = sparkJavaOpts ++ extraJavaOpts
 val command = new Command(mainClass,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client

2019-03-22 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 6f1a8d8  [SPARK-26606][CORE] Handle driver options properly when 
submitting to standalone cluster mode via legacy Client
6f1a8d8 is described below

commit 6f1a8d8bfdd8dccc9af2d144ea5ad644ddc63a81
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Mar 22 15:07:49 2019 -0700

[SPARK-26606][CORE] Handle driver options properly when submitting to 
standalone cluster mode via legacy Client

This patch fixes the issue that ClientEndpoint in standalone cluster 
doesn't recognize about driver options which are passed to SparkConf instead of 
system properties. When `Client` is executed via cli they should be provided as 
system properties, but with `spark-submit` they can be provided as SparkConf. 
(SpartSubmit will call `ClientApp.start` with SparkConf which would contain 
these options.)

Manually tested via following steps:

1) setup standalone cluster (launch master and worker via 
`./sbin/start-all.sh`)

2) submit one of example app with standalone cluster mode

```
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
"spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" 
--deploy-mode "cluster" --num-executors 1 --driver-memory 512m 
--executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10
```

3) check whether `foo=BAR` is provided in system properties in Spark UI

https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png;>

Closes #24163 from HeartSaVioR/SPARK-26606.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Marcelo Vanzin 
(cherry picked from commit 8a9eb05137cd4c665f39a54c30d46c0c4eb7d20b)
Signed-off-by: Marcelo Vanzin 
---
 core/src/main/scala/org/apache/spark/deploy/Client.scala | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d514509..708910b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -60,6 +60,10 @@ private class ClientEndpoint(
private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
 
+  private def getProperty(key: String, conf: SparkConf): Option[String] = {
+sys.props.get(key).orElse(conf.getOption(key))
+  }
+
   override def onStart(): Unit = {
 driverArgs.cmd match {
   case "launch" =>
@@ -69,18 +73,19 @@ private class ClientEndpoint(
 val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
 
 val classPathConf = "spark.driver.extraClassPath"
-val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp 
=>
+val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap 
{ cp =>
   cp.split(java.io.File.pathSeparator)
 }
 
 val libraryPathConf = "spark.driver.extraLibraryPath"
-val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap 
{ cp =>
+val libraryPathEntries = getProperty(libraryPathConf, 
conf).toSeq.flatMap { cp =>
   cp.split(java.io.File.pathSeparator)
 }
 
 val extraJavaOptsConf = "spark.driver.extraJavaOptions"
-val extraJavaOpts = sys.props.get(extraJavaOptsConf)
+val extraJavaOpts = getProperty(extraJavaOptsConf, conf)
   .map(Utils.splitCommandString).getOrElse(Seq.empty)
+
 val sparkJavaOpts = Utils.sparkJavaOpts(conf)
 val javaOpts = sparkJavaOpts ++ extraJavaOpts
 val command = new Command(mainClass,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-26606][CORE] Handle driver options properly when submitting to standalone cluster mode via legacy Client

2019-03-22 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8a9eb05  [SPARK-26606][CORE] Handle driver options properly when 
submitting to standalone cluster mode via legacy Client
8a9eb05 is described below

commit 8a9eb05137cd4c665f39a54c30d46c0c4eb7d20b
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Mar 22 15:07:49 2019 -0700

[SPARK-26606][CORE] Handle driver options properly when submitting to 
standalone cluster mode via legacy Client

## What changes were proposed in this pull request?

This patch fixes the issue that ClientEndpoint in standalone cluster 
doesn't recognize about driver options which are passed to SparkConf instead of 
system properties. When `Client` is executed via cli they should be provided as 
system properties, but with `spark-submit` they can be provided as SparkConf. 
(SpartSubmit will call `ClientApp.start` with SparkConf which would contain 
these options.)

## How was this patch tested?

Manually tested via following steps:

1) setup standalone cluster (launch master and worker via 
`./sbin/start-all.sh`)

2) submit one of example app with standalone cluster mode

```
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
"spark://localhost:7077" --conf "spark.driver.extraJavaOptions=-Dfoo=BAR" 
--deploy-mode "cluster" --num-executors 1 --driver-memory 512m 
--executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10
```

3) check whether `foo=BAR` is provided in system properties in Spark UI

https://user-images.githubusercontent.com/1317309/54728501-97db1700-4bc1-11e9-89da-078445c71e9b.png;>

Closes #24163 from HeartSaVioR/SPARK-26606.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Marcelo Vanzin 
---
 core/src/main/scala/org/apache/spark/deploy/Client.scala | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index e65a494..ea7c902 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -61,6 +61,10 @@ private class ClientEndpoint(
private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
 
+  private def getProperty(key: String, conf: SparkConf): Option[String] = {
+sys.props.get(key).orElse(conf.getOption(key))
+  }
+
   override def onStart(): Unit = {
 driverArgs.cmd match {
   case "launch" =>
@@ -70,18 +74,19 @@ private class ClientEndpoint(
 val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
 
 val classPathConf = config.DRIVER_CLASS_PATH.key
-val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp 
=>
+val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap 
{ cp =>
   cp.split(java.io.File.pathSeparator)
 }
 
 val libraryPathConf = config.DRIVER_LIBRARY_PATH.key
-val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap 
{ cp =>
+val libraryPathEntries = getProperty(libraryPathConf, 
conf).toSeq.flatMap { cp =>
   cp.split(java.io.File.pathSeparator)
 }
 
 val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key
-val extraJavaOpts = sys.props.get(extraJavaOptsConf)
+val extraJavaOpts = getProperty(extraJavaOptsConf, conf)
   .map(Utils.splitCommandString).getOrElse(Seq.empty)
+
 val sparkJavaOpts = Utils.sparkJavaOpts(conf)
 val javaOpts = sparkJavaOpts ++ extraJavaOpts
 val command = new Command(mainClass,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.

2019-03-22 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 34e3cc7  [SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.
34e3cc7 is described below

commit 34e3cc70602b5107ebeea3f99c7c41672107ca13
Author: Ryan Blue 
AuthorDate: Fri Mar 22 13:58:54 2019 -0700

[SPARK-27108][SQL] Add parsed SQL plans for create, CTAS.

## What changes were proposed in this pull request?

This moves parsing `CREATE TABLE ... USING` statements into catalyst. 
Catalyst produces logical plans with the parsed information and those plans are 
converted to v1 `DataSource` plans in `DataSourceAnalysis`.

This prepares for adding v2 create plans that should receive the 
information parsed from SQL without being translated to v1 plans first.

This also makes it possible to parse in catalyst instead of breaking the 
parser across the abstract `AstBuilder` in catalyst and `SparkSqlParser` in 
core.

For more information, see the [mailing list 
thread](https://lists.apache.org/thread.html/54f4e1929ceb9a2b0cac7cb058000feb8de5d6c667b2e0950804c613%3Cdev.spark.apache.org%3E).

## How was this patch tested?

This uses existing tests to catch regressions. This introduces no behavior 
changes.

Closes #24029 from rdblue/SPARK-27108-add-parsed-create-logical-plans.

Authored-by: Ryan Blue 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/parser/AstBuilder.scala | 192 -
 .../plans/logical/sql/CreateTableStatement.scala   |  66 +
 .../plans/logical/sql/ParsedStatement.scala|  44 +++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 318 +
 .../spark/sql/execution/SparkSqlParser.scala   | 233 ++-
 .../datasources/DataSourceResolution.scala | 112 
 .../sql/internal/BaseSessionStateBuilder.scala |   1 +
 .../spark/sql/execution/SparkSqlParserSuite.scala  |  13 -
 .../sql/execution/command/DDLParserSuite.scala | 251 +---
 .../execution/command/PlanResolutionSuite.scala| 257 +
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   1 +
 11 files changed, 1030 insertions(+), 458 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38a61b8..52a5d2c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -30,12 +30,13 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.sql.{CreateTableAsSelectStatement, 
CreateTableStatement}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
stringToDate, stringToTimestamp}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -1888,4 +1889,193 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 val structField = StructField(identifier.getText, typedVisit(dataType), 
nullable = true)
 if (STRING == null) structField else 
structField.withComment(string(STRING))
   }
+
+  /**
+   * Create location string.
+   */
+  override def visitLocationSpec(ctx: LocationSpecContext): String = 
withOrigin(ctx) {
+string(ctx.STRING)
+  }
+
+  /**
+   * Create a [[BucketSpec]].
+   */
+  override def visitBucketSpec(ctx: BucketSpecContext): BucketSpec = 
withOrigin(ctx) {
+BucketSpec(
+  ctx.INTEGER_VALUE.getText.toInt,
+  visitIdentifierList(ctx.identifierList),
+  Option(ctx.orderedIdentifierList)
+  .toSeq
+  .flatMap(_.orderedIdentifier.asScala)
+  .map { orderedIdCtx =>
+Option(orderedIdCtx.ordering).map(_.getText).foreach { dir =>
+  if (dir.toLowerCase(Locale.ROOT) != "asc") {
+operationNotAllowed(s"Column ordering must be ASC, was 
'$dir'", ctx)
+  }
+}
+
+orderedIdCtx.identifier.getText
+  })
+  }
+
+  /**
+   * Convert a table property list into a key-value map.
+   * This should be called 

[spark] branch master updated: [SPARK-27210][SS] Cleanup incomplete output files in ManifestFileCommitProtocol if task is aborted

2019-03-22 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 78d546f  [SPARK-27210][SS] Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted
78d546f is described below

commit 78d546fe15aebcbf4b671c44383ddcf82b05b8a7
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Mar 22 11:26:53 2019 -0700

[SPARK-27210][SS] Cleanup incomplete output files in 
ManifestFileCommitProtocol if task is aborted

## What changes were proposed in this pull request?

This patch proposes ManifestFileCommitProtocol to clean up incomplete 
output files in task level if task aborts. Please note that this works as 
'best-effort', not kind of guarantee, as we have in 
HadoopMapReduceCommitProtocol.

## How was this patch tested?

Added UT.

Closes #24154 from HeartSaVioR/SPARK-27210.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Shixiong Zhu 
---
 .../streaming/ManifestFileCommitProtocol.scala |  7 --
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 29 ++
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 92191c8..916bd2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -114,7 +114,10 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
   }
 
   override def abortTask(taskContext: TaskAttemptContext): Unit = {
-// Do nothing
-// TODO: we can also try delete the addedFiles as a best-effort cleanup.
+// best effort cleanup of incomplete files
+if (addedFiles.nonEmpty) {
+  val fs = new 
Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
+  addedFiles.foreach { file => fs.delete(new Path(file), false) }
+}
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 619d118..020ab23 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.nio.file.Files
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -478,4 +481,30 @@ class FileStreamSinkSuite extends StreamTest {
   checkDatasetUnorderly(outputDf, 1, 2, 3)
 }
   }
+
+  testQuietly("cleanup incomplete output for aborted task") {
+withTempDir { tempDir =>
+  val checkpointDir = new File(tempDir, "chk")
+  val outputDir = new File(tempDir, "output")
+  val inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3)
+  val q = inputData.toDS().map(_ / 0)
+.writeStream
+.option("checkpointLocation", checkpointDir.getCanonicalPath)
+.format("parquet")
+.start(outputDir.getCanonicalPath)
+
+  intercept[StreamingQueryException] {
+try {
+  q.processAllAvailable()
+} finally {
+  q.stop()
+}
+  }
+
+  val outputFiles = Files.walk(outputDir.toPath).iterator().asScala
+.filter(_.toString.endsWith(".parquet"))
+  assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned 
up.")
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-27174][SQL] Add support for casting integer types to binary

2019-03-22 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8efc5ec  [SPARK-27174][SQL] Add support for casting integer types to 
binary
8efc5ec is described below

commit 8efc5ec72e2f5899547941010e22c023d6cb86b3
Author: Martin Junghanns 
AuthorDate: Fri Mar 22 10:09:35 2019 -0700

[SPARK-27174][SQL] Add support for casting integer types to binary

Co-authored-by: Philip Stutz 

## What changes were proposed in this pull request?

This PR adds support for casting

* `ByteType`
* `ShortType`
* `IntegerType`
* `LongType`

to `BinaryType`.

## How was this patch tested?

We added unit tests for casting instances of the above types. For 
validation, we used Javas `DataOutputStream` to compare the resulting byte 
array with the result of `Cast`.

We state that the contribution is our original work and that we license the 
work to the project under the project’s open source license.

cloud-fan we'd appreciate a review if you find the time, thx

Closes #24107 from s1ck/cast_to_binary.

Authored-by: Martin Junghanns 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/expressions/Cast.scala  |  11 +-
 .../spark/sql/catalyst/util/NumberConverter.scala  |  35 ++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |   5 -
 .../sql/catalyst/util/NumberConverterSuite.scala   |  48 -
 .../src/test/resources/sql-tests/inputs/cast.sql   |  13 +++
 .../test/resources/sql-tests/results/cast.sql.out  |  84 +--
 .../typeCoercion/native/binaryComparison.sql.out   | 120 +
 .../native/windowFrameCoercion.sql.out |   2 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |   4 +-
 9 files changed, 234 insertions(+), 88 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 72cb6b2..848195f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -44,6 +44,7 @@ object Cast {
 case (_, StringType) => true
 
 case (StringType, BinaryType) => true
+case (_: IntegralType, BinaryType) => true
 
 case (StringType, BooleanType) => true
 case (DateType, BooleanType) => true
@@ -326,6 +327,10 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
   // BinaryConverter
   private[this] def castToBinary(from: DataType): Any => Any = from match {
 case StringType => buildCast[UTF8String](_, _.getBytes)
+case ByteType => buildCast[Byte](_, NumberConverter.toBinary)
+case ShortType => buildCast[Short](_, NumberConverter.toBinary)
+case IntegerType => buildCast[Int](_, NumberConverter.toBinary)
+case LongType => buildCast[Long](_, NumberConverter.toBinary)
   }
 
   // UDFToBoolean
@@ -908,7 +913,11 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
 
   private[this] def castToBinaryCode(from: DataType): CastFunction = from 
match {
 case StringType =>
-  (c, evPrim, evNull) => code"$evPrim = $c.getBytes();"
+  (c, evPrim, evNull) =>
+code"$evPrim = $c.getBytes();"
+case _: IntegralType =>
+  (c, evPrim, evNull) =>
+code"$evPrim = 
${NumberConverter.getClass.getName.stripSuffix("$")}.toBinary($c);"
   }
 
   private[this] def castToDateCode(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala
index 9c3f6b7..7dbdd1e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/NumberConverter.scala
@@ -169,4 +169,39 @@ object NumberConverter {
 }
 UTF8String.fromBytes(java.util.Arrays.copyOfRange(temp, resultStartPos, 
temp.length))
   }
+
+  def toBinary(l: Long): Array[Byte] = {
+val result = new Array[Byte](8)
+result(0) = (l >>> 56 & 0xFF).toByte
+result(1) = (l >>> 48 & 0xFF).toByte
+result(2) = (l >>> 40 & 0xFF).toByte
+result(3) = (l >>> 32 & 0xFF).toByte
+result(4) = (l >>> 24 & 0xFF).toByte
+result(5) = (l >>> 16 & 0xFF).toByte
+result(6) = (l >>> 8 & 0xFF).toByte
+result(7) = (l & 0xFF).toByte
+result
+  }
+
+  def toBinary(i: Int): Array[Byte] = {
+val result = new Array[Byte](4)
+result(0) = (i >>> 24 & 0xFF).toByte
+result(1) = (i >>> 16 & 0xFF).toByte
+result(2) = (i >>> 8 & 0xFF).toByte
+result(3) = (i & 0xFF).toByte
+result
+  }
+
+  def toBinary(s: 

[spark] branch master updated: [SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for Yarn

2019-03-22 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8204dc1  [SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for 
Yarn
8204dc1 is described below

commit 8204dc1e548b87aabaf36c5800592bafd44e4419
Author: 10087686 
AuthorDate: Fri Mar 22 05:29:29 2019 -0500

[SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for Yarn

## What changes were proposed in this pull request?
There is some hardcode configs in code, I think it best to modify。

## How was this patch tested?
Existing tests

Closes #24103 from wangjiaochun/yarnHardCode.

Authored-by: 10087686 
Signed-off-by: Sean Owen 
---
 .../apache/spark/deploy/yarn/ApplicationMaster.scala   |  4 ++--
 .../spark/deploy/yarn/ApplicationMasterSuite.scala |  3 ++-
 .../spark/deploy/yarn/BaseYarnClusterSuite.scala   |  2 +-
 .../apache/spark/deploy/yarn/YarnAllocatorSuite.scala  | 17 +
 .../apache/spark/deploy/yarn/YarnClusterSuite.scala| 18 +-
 .../spark/network/yarn/YarnShuffleServiceSuite.scala   |  3 ++-
 6 files changed, 25 insertions(+), 22 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 743c2e0..e4b6b3d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -296,8 +296,8 @@ private[spark] class ApplicationMaster(
 Option(appAttemptId.getApplicationId.toString), 
None).setCurrentContext()
 
   val driverRef = clientRpcEnv.setupEndpointRef(
-RpcAddress(sparkConf.get("spark.driver.host"),
-  sparkConf.get("spark.driver.port").toInt),
+RpcAddress(sparkConf.get(DRIVER_HOST_ADDRESS),
+  sparkConf.get(DRIVER_PORT)),
 YarnSchedulerBackend.ENDPOINT_NAME)
   // The client-mode AM doesn't listen for incoming connections, so report 
an invalid port.
   registerAM(Utils.localHostName, -1, sparkConf,
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala
index 695a82f..d9bdace 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
 
 class ApplicationMasterSuite extends SparkFunSuite {
 
@@ -28,7 +29,7 @@ class ApplicationMasterSuite extends SparkFunSuite {
 val port = 18080
 val sparkConf = new SparkConf()
 
-sparkConf.set("spark.yarn.historyServer.address",
+sparkConf.set(HISTORY_SERVER_ADDRESS,
   
"http://${hadoopconf-yarn.resourcemanager.hostname}:${spark.history.ui.port};)
 val yarnConf = new YarnConfiguration()
 yarnConf.set("yarn.resourcemanager.hostname", host)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 49367e0..b9aeb1c 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -149,7 +149,7 @@ abstract class BaseYarnClusterSuite
 launcher.setSparkHome(sys.props("spark.test.home"))
   .setMaster("yarn")
   .setDeployMode(deployMode)
-  .setConf("spark.executor.instances", "1")
+  .setConf(EXECUTOR_INSTANCES.key, "1")
   .setPropertiesFile(propsFile)
   .addAppArgs(appArgs.toArray: _*)
 
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 53a538d..42b5966 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import 

[spark] branch master updated: [MINOR][CORE] Leverage modified Utils.classForName to reduce scalastyle off for Class.forName

2019-03-22 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 174531c  [MINOR][CORE] Leverage modified Utils.classForName to reduce 
scalastyle off for Class.forName
174531c is described below

commit 174531c183d058c6f92330ef1780e5a5c03d34f0
Author: Jungtaek Lim (HeartSaVioR) 
AuthorDate: Fri Mar 22 05:28:46 2019 -0500

[MINOR][CORE] Leverage modified Utils.classForName to reduce scalastyle off 
for Class.forName

## What changes were proposed in this pull request?

This patch modifies Utils.classForName to have optional parameters - 
initialize, noSparkClassLoader - to let callers of Class.forName with thread 
context classloader to use it instead. This helps to reduce scalastyle off for 
Class.forName.

## How was this patch tested?

Existing UTs.

Closes #24148 from 
HeartSaVioR/MINOR-reduce-scalastyle-off-for-class-forname.

Authored-by: Jungtaek Lim (HeartSaVioR) 
Signed-off-by: Sean Owen 
---
 .../apache/spark/serializer/KryoSerializer.scala   | 35 ++
 .../org/apache/spark/util/ClosureCleaner.scala | 14 +++--
 .../main/scala/org/apache/spark/util/Utils.scala   | 20 +
 .../test/scala/org/apache/spark/FileSuite.scala| 15 +++---
 .../KryoSerializerDistributedSuite.scala   |  6 ++--
 .../spark/util/MutableURLClassLoaderSuite.scala|  5 +---
 6 files changed, 41 insertions(+), 54 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 2df133d..eef1997 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -130,7 +130,6 @@ class KryoSerializer(conf: SparkConf)
 val kryo = instantiator.newKryo()
 kryo.setRegistrationRequired(registrationRequired)
 
-val oldClassLoader = Thread.currentThread.getContextClassLoader
 val classLoader = 
defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
 
 // Allow disabling Kryo reference tracking if user knows their object 
graphs don't have loops.
@@ -156,24 +155,22 @@ class KryoSerializer(conf: SparkConf)
 kryo.register(classOf[GenericRecord], new 
GenericAvroSerializer(avroSchemas))
 kryo.register(classOf[GenericData.Record], new 
GenericAvroSerializer(avroSchemas))
 
-try {
-  // scalastyle:off classforname
-  // Use the default classloader when calling the user registrator.
-  Thread.currentThread.setContextClassLoader(classLoader)
-  // Register classes given through spark.kryo.classesToRegister.
-  classesToRegister
-.foreach { className => kryo.register(Class.forName(className, true, 
classLoader)) }
-  // Allow the user to register their own classes by setting 
spark.kryo.registrator.
-  userRegistrators
-.map(Class.forName(_, true, classLoader).getConstructor().
-  newInstance().asInstanceOf[KryoRegistrator])
-.foreach { reg => reg.registerClasses(kryo) }
-  // scalastyle:on classforname
-} catch {
-  case e: Exception =>
-throw new SparkException(s"Failed to register classes with Kryo", e)
-} finally {
-  Thread.currentThread.setContextClassLoader(oldClassLoader)
+// Use the default classloader when calling the user registrator.
+Utils.withContextClassLoader(classLoader) {
+  try {
+// Register classes given through spark.kryo.classesToRegister.
+classesToRegister.foreach { className =>
+  kryo.register(Utils.classForName(className, noSparkClassLoader = 
true))
+}
+// Allow the user to register their own classes by setting 
spark.kryo.registrator.
+userRegistrators
+  .map(Utils.classForName(_, noSparkClassLoader = 
true).getConstructor().
+newInstance().asInstanceOf[KryoRegistrator])
+  .foreach { reg => reg.registerClasses(kryo) }
+  } catch {
+case e: Exception =>
+  throw new SparkException(s"Failed to register classes with Kryo", e)
+  }
 }
 
 // Register Chill's classes; we do this after our ranges and the user's 
own classes to let
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 1b3e525..5f725d8 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -378,10 +378,8 @@ private[spark] object ClosureCleaner extends Logging {
 } else {
   logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
 
-  // scalastyle:off classforname
-  val captClass = 

[spark] branch master updated: [SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in stringToTimestamp

2019-03-22 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a529be2  [SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in 
stringToTimestamp
a529be2 is described below

commit a529be2930b1d69015f1ac8f85e590f197cf53cf
Author: Maxim Gekk 
AuthorDate: Fri Mar 22 18:01:29 2019 +0900

[SPARK-27212][SQL] Eliminate TimeZone to ZoneId conversion in 
stringToTimestamp

## What changes were proposed in this pull request?

In the PR, I propose to avoid the `TimeZone` to `ZoneId` conversion in 
`DateTimeUtils.stringToTimestamp` by changing signature of the method, and 
require a parameter of `ZoneId` type. This will allow to avoid unnecessary 
conversion (`TimeZone` -> `String` -> `ZoneId`) per each row.

Also the PR avoids creation of `ZoneId` instances from `ZoneOffset` because 
`ZoneOffset` is a sub-class, and the conversion is unnecessary too.

## How was this patch tested?

It was tested by `DateTimeUtilsSuite` and `CastSuite`.

Closes #24155 from MaxGekk/stringtotimestamp-zoneid.

Authored-by: Maxim Gekk 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/catalyst/expressions/Cast.scala  |  6 +--
 .../spark/sql/catalyst/parser/AstBuilder.scala |  6 +--
 .../spark/sql/catalyst/util/DateTimeUtils.scala|  8 ++--
 .../expressions/HashExpressionsSuite.scala |  8 ++--
 .../sql/catalyst/util/DateTimeUtilsSuite.scala | 46 +++---
 .../spark/sql/catalyst/util/UnsafeArraySuite.scala |  9 -
 .../execution/datasources/jdbc/JDBCRelation.scala  |  4 +-
 7 files changed, 47 insertions(+), 40 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index a70ed6d..72cb6b2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -364,7 +364,7 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
   // TimestampConverter
   private[this] def castToTimestamp(from: DataType): Any => Any = from match {
 case StringType =>
-  buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, 
timeZone).orNull)
+  buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, 
zoneId).orNull)
 case BooleanType =>
   buildCast[Boolean](_, b => if (b) 1L else 0)
 case LongType =>
@@ -1017,12 +1017,12 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
   from: DataType,
   ctx: CodegenContext): CastFunction = from match {
 case StringType =>
-  val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), 
timeZone.getClass)
+  val zid = ctx.addReferenceObj("zoneId", zoneId, "java.time.ZoneId")
   val longOpt = ctx.freshVariable("longOpt", classOf[Option[Long]])
   (c, evPrim, evNull) =>
 code"""
   scala.Option $longOpt =
-
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $tz);
+
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid);
   if ($longOpt.isDefined()) {
 $evPrim = ((Long) $longOpt.get()).longValue();
   } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index b1d6be5..38a61b8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, 
stringToDate, stringToTimestamp}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, 
stringToDate, stringToTimestamp}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@@ -1593,8 +1593,8 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
   valueType match {
 case "DATE" => toLiteral(stringToDate, DateType)
 case "TIMESTAMP" =>
-  val timeZone = getTimeZone(SQLConf.get.sessionLocalTimeZone)
-  toLiteral(stringToTimestamp(_, timeZone), TimestampType)
+  val zoneId = getZoneId(SQLConf.get.sessionLocalTimeZone)
+