svn commit: r26334 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_20_01-73f2853-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-13 Thread pwendell
Author: pwendell
Date: Sat Apr 14 03:15:33 2018
New Revision: 26334

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_13_20_01-73f2853 docs


[This commit notification would consist of 1458 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23979][SQL] MultiAlias should not be a CodegenFallback

2018-04-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master cbb41a0c5 -> 73f28530d


[SPARK-23979][SQL] MultiAlias should not be a CodegenFallback

## What changes were proposed in this pull request?

Just found `MultiAlias` is a `CodegenFallback`. It should not be as looks like 
`MultiAlias` won't be evaluated.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh 

Closes #21065 from viirya/multialias-without-codegenfallback.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73f28530
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73f28530
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73f28530

Branch: refs/heads/master
Commit: 73f28530d6f6dd8aba758ea818c456cf911a5f41
Parents: cbb41a0
Author: Liang-Chi Hsieh 
Authored: Sat Apr 14 08:59:04 2018 +0800
Committer: Wenchen Fan 
Committed: Sat Apr 14 08:59:04 2018 +0800

--
 .../org/apache/spark/sql/catalyst/analysis/unresolved.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73f28530/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index a65f58f..71e2317 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.parser.ParserUtils
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
UnaryNode}
 import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -335,7 +335,7 @@ case class UnresolvedRegex(regexPattern: String, table: 
Option[String], caseSens
  * @param names the names to be associated with each output of computing 
[[child]].
  */
 case class MultiAlias(child: Expression, names: Seq[String])
-  extends UnaryExpression with NamedExpression with CodegenFallback {
+  extends UnaryExpression with NamedExpression with Unevaluable {
 
   override def name: String = throw new UnresolvedException(this, "name")
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface

2018-04-13 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 558f31b31 -> cbb41a0c5


[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common 
CheckpointFileManager interface

## What changes were proposed in this pull request?

Checkpoint files (offset log files, state store files) in Structured Streaming 
must be written atomically such that no partial files are generated (would 
break fault-tolerance guarantees). Currently, there are 3 locations which try 
to do this individually, and in some cases, incorrectly.

1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any 
implementation of `FileSystem` or `FileContext` APIs. It preferably loads 
`FileContext` implementation as FileContext of HDFS has atomic renames.
1. HDFSBackedStateStore (aka in-memory state store)
  - Writing a version.delta file - This uses FileSystem APIs only to perform a 
rename. This is incorrect as rename is not atomic in HDFS FileSystem 
implementation.
  - Writing a snapshot file - Same as above.

 Current problems:
1. State Store behavior is incorrect - HDFS FileSystem implementation does not 
have atomic rename.
1. Inflexible - Some file systems provide mechanisms other than 
write-to-temp-file-and-rename for writing atomically and more efficiently. For 
example, with S3 you can write directly to the final file and it will be made 
visible only when the entire file is written and closed correctly. Any failure 
can be made to terminate the writing without making any partial files visible 
in S3. The current code does not abstract out this mechanism enough that it can 
be customized.

 Solution:

1. Introduce a common interface that all 3 cases above can use to write 
checkpoint files atomically.
2. This interface must provide the necessary interfaces that allow 
customization of the write-and-rename mechanism.

This PR does that by introducing the interface `CheckpointFileManager` and 
modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. 
Similar to earlier `FileManager`, there are implementations based on 
`FileSystem` and `FileContext` APIs, and the latter implementation is preferred 
to make it work correctly with HDFS.

The key method this interface has is `createAtomic(path, overwrite)` which 
returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All 
users of this method need to either call `close()` to successfully write the 
file, or `cancel()` in case of an error.

## How was this patch tested?
New tests in `CheckpointFileManagerSuite` and slightly modified existing tests.

Author: Tathagata Das 

Closes #21048 from tdas/SPARK-23966.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbb41a0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbb41a0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbb41a0c

Branch: refs/heads/master
Commit: cbb41a0c5b01579c85f06ef42cc0585fbef216c5
Parents: 558f31b
Author: Tathagata Das 
Authored: Fri Apr 13 16:31:39 2018 -0700
Committer: Tathagata Das 
Committed: Fri Apr 13 16:31:39 2018 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../streaming/CheckpointFileManager.scala   | 349 +++
 .../execution/streaming/HDFSMetadataLog.scala   | 229 +---
 .../state/HDFSBackedStateStoreProvider.scala| 120 +++
 .../execution/streaming/state/StateStore.scala  |   4 +-
 .../streaming/CheckpointFileManagerSuite.scala  | 192 ++
 .../CompactibleFileStreamLogSuite.scala |   5 -
 .../streaming/HDFSMetadataLogSuite.scala| 116 +-
 .../streaming/state/StateStoreSuite.scala   |  58 ++-
 9 files changed, 678 insertions(+), 402 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1c8ab9c..0dc47bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -930,6 +930,13 @@ object SQLConf {
   .intConf
   .createWithDefault(100)
 
+  val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS =
+buildConf("spark.sql.streaming.checkpointFileManagerClass")
+  .doc("The class used to write checkpoint files atomically. This class 
must be a subclass " +
+"of the interface CheckpointFileManager.")
+  .internal()
+  .stringConf
+
   val NDV_MAX_ERROR =
 buildConf("spark.sql.statistics.ndv.maxError")
   .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql

svn commit: r26333 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_16_01-558f31b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-13 Thread pwendell
Author: pwendell
Date: Fri Apr 13 23:15:51 2018
New Revision: 26333

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_13_16_01-558f31b docs


[This commit notification would consist of 1458 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table

2018-04-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 25892f3cc -> 558f31b31


[SPARK-23963][SQL] Properly handle large number of columns in query on 
text-based Hive table

## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the 
query increased.

I fixed the way TableReader was looking up metadata for each column in the row. 
Previously, it had been looking up this data in linked lists, accessing each 
linked list by an index (column number). Now it looks up this data in arrays, 
where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins 

Closes #21043 from bersprockets/tabreadfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/558f31b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/558f31b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/558f31b3

Branch: refs/heads/master
Commit: 558f31b31c73b7e9f26f56498b54cf53997b59b8
Parents: 25892f3
Author: Bruce Robbins 
Authored: Fri Apr 13 14:05:04 2018 -0700
Committer: gatorsmile 
Committed: Fri Apr 13 14:05:04 2018 -0700

--
 .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/558f31b3/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index cc8907a..b5444a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -381,7 +381,7 @@ private[hive] object HadoopTableReader extends 
HiveInspectors with Logging {
 
 val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, 
ordinal) =>
   soi.getStructFieldRef(attr.name) -> ordinal
-}.unzip
+}.toArray.unzip
 
 /**
  * Builds specific unwrappers ahead of time according to object inspector


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark-website git commit: Update committer page

2018-04-13 Thread dbtsai
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 658467248 -> 69b595481


Update committer page

Author: DB Tsai 

Closes #113 from dbtsai/changeAffiliation.


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/69b59548
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/69b59548
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/69b59548

Branch: refs/heads/asf-site
Commit: 69b595481c6b6866a66f1e7a45265246eb36b2c1
Parents: 6584672
Author: DB Tsai 
Authored: Fri Apr 13 13:50:48 2018 -0700
Committer: DB Tsai 
Committed: Fri Apr 13 13:50:48 2018 -0700

--
 committers.md| 2 +-
 site/committers.html | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/69b59548/committers.md
--
diff --git a/committers.md b/committers.md
index f538476..eef9f2c 100644
--- a/committers.md
+++ b/committers.md
@@ -60,7 +60,7 @@ navigation:
 |Saisai Shao|Hortonworks|
 |Prashant Sharma|IBM|
 |Ram Sriharsha|Databricks|
-|DB Tsai|Netflix|
+|DB Tsai|Apple|
 |Takuya Ueshin|Databricks|
 |Marcelo Vanzin|Cloudera|
 |Shivaram Venkataraman|University of Wisconsin, Madison|

http://git-wip-us.apache.org/repos/asf/spark-website/blob/69b59548/site/committers.html
--
diff --git a/site/committers.html b/site/committers.html
index 014beae..c5b40e9 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -412,7 +412,7 @@
 
 
   DB Tsai
-  Netflix
+  Apple
 
 
   Takuya Ueshin


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26330 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_12_01-25892f3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-13 Thread pwendell
Author: pwendell
Date: Fri Apr 13 19:16:00 2018
New Revision: 26330

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_13_12_01-25892f3 docs


[This commit notification would consist of 1458 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer

2018-04-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 4dfd746de -> 25892f3cc


[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer

## What changes were proposed in this pull request?

Added a new rule to remove Sort operation when its child is already sorted.
For instance, this simple code:
```
spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", 
"b").registerTempTable("table1")
val df = sql(s"""SELECT b
| FROM (
| SELECT a, b
| FROM table1
| ORDER BY a
| ) t
| ORDER BY a""".stripMargin)
df.explain(true)
```
before the PR produces this plan:
```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
  +- 'Sort ['a ASC NULLS FIRST], true
 +- 'Project ['a, 'b]
+- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
  +- SubqueryAlias t
 +- Sort [a#6 ASC NULLS FIRST], true
+- Project [a#6, b#7]
   +- SubqueryAlias table1
  +- Project [_1#3 AS a#6, _2#4 AS b#7]
 +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
  +- Sort [a#6 ASC NULLS FIRST], true
 +- Project [_1#3 AS a#6, _2#4 AS b#7]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS 
_2#4]
   +- ExternalRDD [obj#2]

== Physical Plan ==
*(3) Project [b#7]
+- *(3) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
  +- *(2) Project [b#7, a#6]
 +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200)
   +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
  +- *(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS 
_2#4]
 +- Scan ExternalRDDScan[obj#2]
```

while after the PR produces:

```
== Parsed Logical Plan ==
'Sort ['a ASC NULLS FIRST], true
+- 'Project ['b]
   +- 'SubqueryAlias t
  +- 'Sort ['a ASC NULLS FIRST], true
 +- 'Project ['a, 'b]
+- 'UnresolvedRelation `table1`

== Analyzed Logical Plan ==
b: string
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [b#7, a#6]
  +- SubqueryAlias t
 +- Sort [a#6 ASC NULLS FIRST], true
+- Project [a#6, b#7]
   +- SubqueryAlias table1
  +- Project [_1#3 AS a#6, _2#4 AS b#7]
 +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [b#7]
+- Sort [a#6 ASC NULLS FIRST], true
   +- Project [_1#3 AS a#6, _2#4 AS b#7]
  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS 
_2#4]
 +- ExternalRDD [obj#2]

== Physical Plan ==
*(2) Project [b#7]
+- *(2) Sort [a#6 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 5)
  +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7]
 +- *(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(input[0, scala.Tuple2

spark git commit: [SPARK-23896][SQL] Improve PartitioningAwareFileIndex

2018-04-13 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master a83ae0d9b -> 4dfd746de


[SPARK-23896][SQL] Improve PartitioningAwareFileIndex

## What changes were proposed in this pull request?

Currently `PartitioningAwareFileIndex` accepts an optional parameter 
`userPartitionSchema`. If provided, it will combine the inferred partition 
schema with the parameter.

However,
1. to get `userPartitionSchema`, we need to  combine inferred partition schema 
with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file 
index.

Only after that, a final version of `PartitioningAwareFileIndex` can be created.

This can be improved by passing `userSpecifiedSchema` to 
`PartitioningAwareFileIndex`.

With the improvement, we can reduce redundant code and avoid parsing the file 
partition twice.
## How was this patch tested?
Unit test

Author: Gengliang Wang 

Closes #21004 from gengliangwang/PartitioningAwareFileIndex.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dfd746d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dfd746d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dfd746d

Branch: refs/heads/master
Commit: 4dfd746de3f4346ed0c2191f8523a7e6cc9f064d
Parents: a83ae0d
Author: Gengliang Wang 
Authored: Sat Apr 14 00:22:38 2018 +0800
Committer: Wenchen Fan 
Committed: Sat Apr 14 00:22:38 2018 +0800

--
 .../datasources/CatalogFileIndex.scala  |   2 +-
 .../sql/execution/datasources/DataSource.scala  | 133 ---
 .../datasources/InMemoryFileIndex.scala |   8 +-
 .../PartitioningAwareFileIndex.scala|  54 +---
 .../streaming/MetadataLogFileIndex.scala|  10 +-
 .../datasources/FileSourceStrategySuite.scala   |   2 +-
 .../hive/PartitionedTablePerfStatsSuite.scala   |   2 +-
 7 files changed, 103 insertions(+), 108 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4046396..a66a076 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -85,7 +85,7 @@ class CatalogFileIndex(
 sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec, Option(timeNs))
 } else {
   new InMemoryFileIndex(
-sparkSession, rootPaths, table.storage.properties, partitionSchema = 
None)
+sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema 
= None)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b84ea76..f16d824 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
 import scala.language.{existentials, implicitConversions}
 import scala.util.{Failure, Success, Try}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -104,24 +103,6 @@ case class DataSource(
   }
 
   /**
-   * In the read path, only managed tables by Hive provide the partition 
columns properly when
-   * initializing this class. All other file based data sources will try to 
infer the partitioning,
-   * and then cast the inferred types to user specified dataTypes if the 
partition columns exist
-   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs 
like SPARK-18510, or
-   * inconsistent data types as reported in SPARK-21463.
-   * @param fileIndex A FileIndex that will perform partition inference
-   * @return The PartitionSchema resolved from inference and cast according to 
`userSpecifiedSchema`
-   */
-  private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: 
FileIndex): StructType = {
-val resolved = fileIndex.partitionSchema.map { partitionField =>
-  // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise 
fallback to inferred
-  userSpecifiedSchema.flatMap(_.find

[1/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

2018-04-13 Thread foxish
Repository: spark
Updated Branches:
  refs/heads/master 0323e6146 -> a83ae0d9b


http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
new file mode 100644
index 000..9d02f56
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SecretVolumeUtils, SparkPod}
+
+class MountSecretsFeatureStepSuite extends SparkFunSuite {
+
+  private val SECRET_FOO = "foo"
+  private val SECRET_BAR = "bar"
+  private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
+
+  test("mounts all given secrets") {
+val baseDriverPod = SparkPod.initialPod()
+val secretNamesToMountPaths = Map(
+  SECRET_FOO -> SECRET_MOUNT_PATH,
+  SECRET_BAR -> SECRET_MOUNT_PATH)
+val sparkConf = new SparkConf(false)
+val kubernetesConf = KubernetesConf(
+  sparkConf,
+  KubernetesExecutorSpecificConf("1", new PodBuilder().build()),
+  "resource-name-prefix",
+  "app-id",
+  Map.empty,
+  Map.empty,
+  secretNamesToMountPaths,
+  Map.empty)
+
+val step = new MountSecretsFeatureStep(kubernetesConf)
+val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod
+val driverContainerWithSecretsMounted = 
step.configurePod(baseDriverPod).container
+
+Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+  assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, 
volumeName))
+}
+Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName =>
+  assert(SecretVolumeUtils.containerHasVolume(
+driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 6a50159..c1b203e 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -16,22 +16,17 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import scala.collection.JavaConverters._
-
-import com.google.common.collect.Iterables
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
 import io.fabric8.kubernetes.client.dsl.{MixedOperation, 
NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.Mockito.{doReturn, verify, when}
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfter
 import org.scalatest.mockito.MockitoSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
 
 class ClientSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -39,6 +34,74 @@ class ClientSuite extends SparkFu

[2/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

2018-04-13 Thread foxish
http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
deleted file mode 100644
index 43de329..000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import java.io.File
-
-import io.fabric8.kubernetes.api.model.ContainerBuilder
-
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Step that configures the classpath, spark.jars, and spark.files for the 
driver given that the
- * user may provide remote files or files with local:// schemes.
- */
-private[spark] class DependencyResolutionStep(
-sparkJars: Seq[String],
-sparkFiles: Seq[String]) extends DriverConfigurationStep {
-
-  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
-val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
-val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
-
-val sparkConf = driverSpec.driverSparkConf.clone()
-if (resolvedSparkJars.nonEmpty) {
-  sparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
-}
-if (resolvedSparkFiles.nonEmpty) {
-  sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
-}
-val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
-  new ContainerBuilder(driverSpec.driverContainer)
-.addNewEnv()
-  .withName(ENV_MOUNTED_CLASSPATH)
-  .withValue(resolvedSparkJars.mkString(File.pathSeparator))
-  .endEnv()
-.build()
-} else {
-  driverSpec.driverContainer
-}
-
-driverSpec.copy(
-  driverContainer = resolvedDriverContainer,
-  driverSparkConf = sparkConf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
deleted file mode 100644
index 17614e0..000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.submit.steps
-
-import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
-
-/**
- * Represents a step in configuring the Spark driver pod.
- */
-private[spark] trait DriverConfigurationStep {
-
-  /**
-   * Apply some transformation to the previous state o

[3/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

2018-04-13 Thread foxish
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs

## What changes were proposed in this pull request?

Breaks down the construction of driver pods and executor pods in a way that 
uses a common abstraction for both spark-submit creating the driver and 
KubernetesClusterSchedulerBackend creating the executor. Encourages more code 
reuse and is more legible than the older approach.

The high-level design is discussed in more detail on the JIRA ticket. This pull 
request is the implementation of that design with some minor changes in the 
implementation details.

No user-facing behavior should break as a result of this change.

## How was this patch tested?

Migrated all unit tests from the old submission steps architecture to the new 
architecture. Integration tests should not have to change and pass given that 
this shouldn't change any outward behavior.

Author: mcheah 

Closes #20910 from mccheah/spark-22839-incremental.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83ae0d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83ae0d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83ae0d9

Branch: refs/heads/master
Commit: a83ae0d9bc1b8f4909b9338370efe4020079bea7
Parents: 0323e61
Author: mcheah 
Authored: Fri Apr 13 08:43:58 2018 -0700
Committer: Anirudh Ramanathan 
Committed: Fri Apr 13 08:43:58 2018 -0700

--
 .../org/apache/spark/deploy/k8s/Config.scala|   2 +-
 .../spark/deploy/k8s/KubernetesConf.scala   | 184 ++
 .../spark/deploy/k8s/KubernetesDriverSpec.scala |  31 +++
 .../spark/deploy/k8s/KubernetesUtils.scala  |  11 -
 .../deploy/k8s/MountSecretsBootstrap.scala  |  72 --
 .../org/apache/spark/deploy/k8s/SparkPod.scala  |  34 +++
 .../k8s/features/BasicDriverFeatureStep.scala   | 136 ++
 .../k8s/features/BasicExecutorFeatureStep.scala | 179 ++
 ...DriverKubernetesCredentialsFeatureStep.scala | 216 
 .../k8s/features/DriverServiceFeatureStep.scala |  97 
 .../features/KubernetesFeatureConfigStep.scala  |  71 ++
 .../k8s/features/MountSecretsFeatureStep.scala  |  62 +
 .../k8s/submit/DriverConfigOrchestrator.scala   | 145 ---
 .../submit/KubernetesClientApplication.scala|  80 +++---
 .../k8s/submit/KubernetesDriverBuilder.scala|  56 +
 .../k8s/submit/KubernetesDriverSpec.scala   |  47 
 .../steps/BasicDriverConfigurationStep.scala| 163 
 .../submit/steps/DependencyResolutionStep.scala |  61 -
 .../submit/steps/DriverConfigurationStep.scala  |  30 ---
 .../steps/DriverKubernetesCredentialsStep.scala | 245 ---
 .../submit/steps/DriverMountSecretsStep.scala   |  38 ---
 .../steps/DriverServiceBootstrapStep.scala  | 104 
 .../cluster/k8s/ExecutorPodFactory.scala| 227 -
 .../cluster/k8s/KubernetesClusterManager.scala  |  12 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala |  20 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  41 
 .../spark/deploy/k8s/KubernetesConfSuite.scala  | 175 +
 .../spark/deploy/k8s/KubernetesUtilsTest.scala  |  36 ---
 .../features/BasicDriverFeatureStepSuite.scala  | 153 
 .../BasicExecutorFeatureStepSuite.scala | 179 ++
 ...rKubernetesCredentialsFeatureStepSuite.scala | 174 +
 .../DriverServiceFeatureStepSuite.scala | 227 +
 .../features/KubernetesFeaturesTestUtils.scala  |  61 +
 .../features/MountSecretsFeatureStepSuite.scala |  58 +
 .../spark/deploy/k8s/submit/ClientSuite.scala   | 216 
 .../submit/DriverConfigOrchestratorSuite.scala  | 131 --
 .../submit/KubernetesDriverBuilderSuite.scala   | 102 
 .../BasicDriverConfigurationStepSuite.scala | 122 -
 .../steps/DependencyResolutionStepSuite.scala   |  69 --
 .../DriverKubernetesCredentialsStepSuite.scala  | 153 
 .../steps/DriverMountSecretsStepSuite.scala |  49 
 .../steps/DriverServiceBootstrapStepSuite.scala | 180 --
 .../cluster/k8s/ExecutorPodFactorySuite.scala   | 195 ---
 ...KubernetesClusterSchedulerBackendSuite.scala |  37 +--
 .../k8s/KubernetesExecutorBuilderSuite.scala|  75 ++
 45 files changed, 2482 insertions(+), 2274 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 82f6c71..4086970 100644
--- 
a/reso

svn commit: r26320 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_13_02_01-dfdf1bb-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-13 Thread pwendell
Author: pwendell
Date: Fri Apr 13 09:15:58 2018
New Revision: 26320

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_04_13_02_01-dfdf1bb docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26319 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_00_01-0323e61-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-13 Thread pwendell
Author: pwendell
Date: Fri Apr 13 07:17:57 2018
New Revision: 26319

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_13_00_01-0323e61 docs


[This commit notification would consist of 1458 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23905][SQL] Add UDF weekday

2018-04-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 4b0703679 -> 0323e6146


[SPARK-23905][SQL] Add UDF weekday

## What changes were proposed in this pull request?

Add UDF weekday

## How was this patch tested?

A new test

Author: yucai 

Closes #21009 from yucai/SPARK-23905.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0323e614
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0323e614
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0323e614

Branch: refs/heads/master
Commit: 0323e61465ee747c9a57a70e9d6108876499546e
Parents: 4b07036
Author: yucai 
Authored: Fri Apr 13 00:00:04 2018 -0700
Committer: gatorsmile 
Committed: Fri Apr 13 00:00:04 2018 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/datetimeExpressions.scala   | 55 
 .../expressions/DateExpressionsSuite.scala  | 11 
 .../resources/sql-tests/inputs/datetime.sql |  2 +
 .../sql-tests/results/datetime.sql.out  |  9 +++-
 5 files changed, 67 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0323e614/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 747016b..131b958 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -395,6 +395,7 @@ object FunctionRegistry {
 expression[TruncTimestamp]("date_trunc"),
 expression[UnixTimestamp]("unix_timestamp"),
 expression[DayOfWeek]("dayofweek"),
+expression[WeekDay]("weekday"),
 expression[WeekOfYear]("weekofyear"),
 expression[Year]("year"),
 expression[TimeWindow]("window"),

http://git-wip-us.apache.org/repos/asf/spark/blob/0323e614/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 32fdb13..b9b2cd5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -426,36 +426,71 @@ case class DayOfMonth(child: Expression) extends 
UnaryExpression with ImplicitCa
   """,
   since = "2.3.0")
 // scalastyle:on line.size.limit
-case class DayOfWeek(child: Expression) extends UnaryExpression with 
ImplicitCastInputTypes {
+case class DayOfWeek(child: Expression) extends DayWeek {
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(DateType)
-
-  override def dataType: DataType = IntegerType
+  override protected def nullSafeEval(date: Any): Any = {
+cal.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L)
+cal.get(Calendar.DAY_OF_WEEK)
+  }
 
-  @transient private lazy val c = {
-Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+nullSafeCodeGen(ctx, ev, time => {
+  val cal = classOf[Calendar].getName
+  val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+  val c = "calDayOfWeek"
+  ctx.addImmutableStateIfNotExists(cal, c,
+v => s"""$v = $cal.getInstance($dtu.getTimeZone("UTC"));""")
+  s"""
+$c.setTimeInMillis($time * 1000L * 3600L * 24L);
+${ev.value} = $c.get($cal.DAY_OF_WEEK);
+  """
+})
   }
+}
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(date) - Returns the day of the week for date/timestamp (0 = 
Monday, 1 = Tuesday, ..., 6 = Sunday).",
+  examples = """
+Examples:
+  > SELECT _FUNC_('2009-07-30');
+   3
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class WeekDay(child: Expression) extends DayWeek {
 
   override protected def nullSafeEval(date: Any): Any = {
-c.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L)
-c.get(Calendar.DAY_OF_WEEK)
+cal.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L)
+(cal.get(Calendar.DAY_OF_WEEK) + 5 ) % 7
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
 nullSafeCodeGen(ctx, ev, time => {
   val cal = classOf[Calendar].getName
   val dtu