spark git commit: [SPARK-25010][SQL] Rand/Randn should produce different values for each execution in streaming query

2018-08-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 4446a0b0d -> 43763629f


[SPARK-25010][SQL] Rand/Randn should produce different values for each 
execution in streaming query

## What changes were proposed in this pull request?

Like Uuid in SPARK-24896, Rand and Randn expressions now produce the same 
results for each execution in streaming query. It doesn't make too much sense 
for streaming queries. We should make them produce different results as Uuid.

In this change, similar to Uuid, we assign new random seeds to Rand/Randn when 
returning optimized plan from `IncrementalExecution`.

Note: Different to Uuid, Rand/Randn can be created with initial seed. Because 
we replace this initial seed at `IncrementalExecution`, it doesn't use the 
initial seed anymore. For now it seems to me not a big issue for streaming 
query. But need to confirm with others. cc zsxwing cloud-fan

## How was this patch tested?

Added test.

Closes #21980 from viirya/SPARK-25010.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43763629
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43763629
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43763629

Branch: refs/heads/master
Commit: 43763629f1d1a220cd91e2aed89152d065dfba24
Parents: 4446a0b
Author: Liang-Chi Hsieh 
Authored: Tue Aug 7 14:28:14 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Aug 7 14:28:14 2018 +0800

--
 .../spark/sql/catalyst/expressions/misc.scala   |  5 -
 .../expressions/randomExpressions.scala | 16 --
 .../streaming/IncrementalExecution.scala| 10 +++--
 .../sql/streaming/StreamingQuerySuite.scala | 22 +++-
 4 files changed, 42 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/43763629/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 5d98dac..0cdeda9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -126,10 +126,13 @@ case class CurrentDatabase() extends LeafExpression with 
Unevaluable {
   """,
   note = "The function is non-deterministic.")
 // scalastyle:on line.size.limit
-case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with 
Stateful {
+case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with 
Stateful
+with ExpressionWithRandomSeed {
 
   def this() = this(None)
 
+  override def withNewSeed(seed: Long): Uuid = Uuid(Some(seed))
+
   override lazy val resolved: Boolean = randomSeed.isDefined
 
   override def nullable: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/43763629/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
index 926c2f0..b70c341 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala
@@ -57,6 +57,14 @@ abstract class RDG extends UnaryExpression with 
ExpectsInputTypes with Stateful
   override def inputTypes: Seq[AbstractDataType] = 
Seq(TypeCollection(IntegerType, LongType))
 }
 
+/**
+ * Represents the behavior of expressions which have a random seed and can 
renew the seed.
+ * Usually the random seed needs to be renewed at each execution under 
streaming queries.
+ */
+trait ExpressionWithRandomSeed {
+  def withNewSeed(seed: Long): Expression
+}
+
 /** Generate a random column with i.i.d. uniformly distributed values in [0, 
1). */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
@@ -72,10 +80,12 @@ abstract class RDG extends UnaryExpression with 
ExpectsInputTypes with Stateful
   """,
   note = "The function is non-deterministic in general case.")
 // scalastyle:on line.size.limit
-case class Rand(child: Expression) extends RDG {
+case class Rand(child: Expression) extends RDG with ExpressionWithRandomSeed {
 
   def this() = this(Literal(Utils.random.nextLong(), LongType))
 
+  override def withNewSeed(seed: Long): Rand = Rand(Literal(seed, LongType))
+
   override protected def evalInternal(

svn commit: r28585 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_20_02-51bee7a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-06 Thread pwendell
Author: pwendell
Date: Tue Aug  7 03:16:00 2018
New Revision: 28585

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_06_20_02-51bee7a docs


[This commit notification would consist of 1473 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23914][SQL][FOLLOW-UP] refactor ArrayUnion

2018-08-06 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master 51bee7aca -> 4446a0b0d


[SPARK-23914][SQL][FOLLOW-UP] refactor ArrayUnion

## What changes were proposed in this pull request?

This PR refactors `ArrayUnion` based on [this 
suggestion](https://github.com/apache/spark/pull/21103#discussion_r205668821).
1. Generate optimized code for all of the primitive types except `boolean`
1. Generate code using `ArrayBuilder` or `ArrayBuffer`
1. Leave only a generic path in the interpreted path

## How was this patch tested?

Existing tests

Author: Kazuaki Ishizaki 

Closes #21937 from kiszk/SPARK-23914-follow.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4446a0b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4446a0b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4446a0b0

Branch: refs/heads/master
Commit: 4446a0b0d9bd830f0e903d6780dedac4db572b5a
Parents: 51bee7a
Author: Kazuaki Ishizaki 
Authored: Tue Aug 7 12:07:56 2018 +0900
Committer: Takuya UESHIN 
Committed: Tue Aug 7 12:07:56 2018 +0900

--
 .../expressions/collectionOperations.scala  | 325 +++
 .../CollectionExpressionsSuite.scala|  21 +-
 .../spark/sql/DataFrameFunctionsSuite.scala |  24 +-
 3 files changed, 153 insertions(+), 217 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4446a0b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index e385c2d..fbb1826 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -3767,230 +3767,159 @@ object ArraySetLike {
   """,
   since = "2.4.0")
 case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike
-with ComplexTypeMergingExpression {
-  var hsInt: OpenHashSet[Int] = _
-  var hsLong: OpenHashSet[Long] = _
-
-  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): 
Boolean = {
-val elem = array.getInt(idx)
-if (!hsInt.contains(elem)) {
-  if (resultArray != null) {
-resultArray.setInt(pos, elem)
-  }
-  hsInt.add(elem)
-  true
-} else {
-  false
-}
-  }
-
-  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
-val elem = array.getLong(idx)
-if (!hsLong.contains(elem)) {
-  if (resultArray != null) {
-resultArray.setLong(pos, elem)
-  }
-  hsLong.add(elem)
-  true
-} else {
-  false
-}
-  }
+  with ComplexTypeMergingExpression {
 
-  def evalIntLongPrimitiveType(
-  array1: ArrayData,
-  array2: ArrayData,
-  resultArray: ArrayData,
-  isLongType: Boolean): Int = {
-// store elements into resultArray
-var nullElementSize = 0
-var pos = 0
-Seq(array1, array2).foreach { array =>
-  var i = 0
-  while (i < array.numElements()) {
-val size = if (!isLongType) hsInt.size else hsLong.size
-if (size + nullElementSize > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-  ArraySetLike.throwUnionLengthOverflowException(size)
-}
-if (array.isNullAt(i)) {
-  if (nullElementSize == 0) {
-if (resultArray != null) {
-  resultArray.setNullAt(pos)
+  @transient lazy val evalUnion: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+val hs = new OpenHashSet[Any]
+var foundNullElement = false
+Seq(array1, array2).foreach { array =>
+  var i = 0
+  while (i < array.numElements()) {
+if (array.isNullAt(i)) {
+  if (!foundNullElement) {
+arrayBuffer += null
+foundNullElement = true
+  }
+} else {
+  val elem = array.get(i, elementType)
+  if (!hs.contains(elem)) {
+if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  
ArraySetLike.throwUnionLengthOverflowException(arrayBuffer.size)
+}
+arrayBuffer += elem
+hs.add(elem)
+  }
 }
-pos += 1
-nullElementSize = 1
+i += 1
   }
-} else {
-  val assigned = if (!isLongType) {
-assignInt(array

spark git commit: [SPARK-25018][INFRA] Use `Co-authored-by` and `Signed-off-by` git trailer in `merge_spark_pr.py`

2018-08-06 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 18b6ec147 -> 51bee7aca


[SPARK-25018][INFRA] Use `Co-authored-by` and `Signed-off-by` git trailer in 
`merge_spark_pr.py`

## What changes were proposed in this pull request?

In [Linux 
community](https://git.wiki.kernel.org/index.php/CommitMessageConventions), 
`Co-authored-by` and `Signed-off-by` git trailer have been used for awhile.

Until recently, Github adopted `Co-authored-by` to include the work of 
co-authors in the profile contributions graph and the repository's statistics. 
It's a convention for recognizing multiple authors, and can encourage people to 
collaborate in OSS communities.

Git provides a command line tools to read the metadata to know who commits the 
code to upstream, but it's not as easy as having `Signed-off-by` as part of the 
message so developers can find who is the relevant committers who can help with 
certain part of the codebase easier.

For a single author PR, I purpose to use `Authored-by` and `Signed-off-by`, so 
the message will look like

```
Authored-by: Author's name 
Signed-off-by: Committer's name 
```

For a multi-author PR, I purpose to use `Lead-authored-by:` and 
`Co-authored-by:` for the lead author and co-authors. The message will look like

```
Lead-authored-by: Lead Author's name 
Co-authored-by: CoAuthor's name 
Signed-off-by: Committer's name 
```

It's also useful to include `Reviewed-by:` to give credits to the people who 
participate on the code reviewing. We can add this in the next iteration.

Closes #21991 from dbtsai/script.

Lead-authored-by: DB Tsai 
Co-authored-by: Liang-Chi Hsieh 
Co-authored-by: Brian Lindblom 
Co-authored-by: hyukjinkwon 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51bee7ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51bee7ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51bee7ac

Branch: refs/heads/master
Commit: 51bee7aca13451167fa3e701fcd60f023eae5e61
Parents: 18b6ec1
Author: DB Tsai 
Authored: Tue Aug 7 10:31:11 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Aug 7 10:31:11 2018 +0800

--
 dev/merge_spark_pr.py | 20 +++-
 1 file changed, 15 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51bee7ac/dev/merge_spark_pr.py
--
diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index fd3eeb0..7a6f7d2 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -142,6 +142,11 @@ def merge_pr(pr_num, target_ref, title, body, 
pr_repo_desc):
 distinct_authors[0])
 if primary_author == "":
 primary_author = distinct_authors[0]
+else:
+# When primary author is specified manually, de-dup it from author 
list and
+# put it at the head of author list.
+distinct_authors = list(filter(lambda x: x != primary_author, 
distinct_authors))
+distinct_authors.insert(0, primary_author)
 
 commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
   '--pretty=format:%h [%an] %s']).split("\n\n")
@@ -154,13 +159,10 @@ def merge_pr(pr_num, target_ref, title, body, 
pr_repo_desc):
 # to people every time someone creates a public fork of Spark.
 merge_message_flags += ["-m", body.replace("@", "")]
 
-authors = "\n".join(["Author: %s" % a for a in distinct_authors])
-
-merge_message_flags += ["-m", authors]
+committer_name = run_cmd("git config --get user.name").strip()
+committer_email = run_cmd("git config --get user.email").strip()
 
 if had_conflicts:
-committer_name = run_cmd("git config --get user.name").strip()
-committer_email = run_cmd("git config --get user.email").strip()
 message = "This patch had conflicts when merged, resolved 
by\nCommitter: %s <%s>" % (
 committer_name, committer_email)
 merge_message_flags += ["-m", message]
@@ -168,6 +170,14 @@ def merge_pr(pr_num, target_ref, title, body, 
pr_repo_desc):
 # The string "Closes #%s" string is required for GitHub to correctly close 
the PR
 merge_message_flags += ["-m", "Closes #%s from %s." % (pr_num, 
pr_repo_desc)]
 
+authors = "Authored-by:" if len(distinct_authors) == 1 else 
"Lead-authored-by:"
+authors += " %s" % (distinct_authors.pop(0))
+if len(distinct_authors) > 0:
+authors += "\n" + "\n".join(["Co-authored-by: %s" % a for a in 
distinct_authors])
+authors += "\n" + "Signed-off-by: %s <%s>" % (committer_name, 
committer_email)
+
+merge_message_flags += ["-m", authors]
+
 run_cmd(['git', 'commit', '--author="%s"' % primary_author] + 
merge_message_flags)
 
 continue_maybe("Merge complete (local ref %s). Push to %s?" % (



spark git commit: [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress

2018-08-06 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 6afe6f32c -> 18b6ec147


[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery 
Progress

## What changes were proposed in this pull request?

Currently the Structured Streaming sources and sinks does not have a way to 
report custom metrics. Providing an option to report custom metrics and making 
it available via Streaming Query progress can enable sources and sinks to 
report custom progress information (E.g. the lag metrics for Kafka source).

Similar metrics can be reported for Sinks as well, but would like to get 
initial feedback before proceeding further.

## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #21721 from arunmahadevan/SPARK-24748.

Authored-by: Arun Mahadevan 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18b6ec14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18b6ec14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18b6ec14

Branch: refs/heads/master
Commit: 18b6ec14716bfafc25ae281b190547ea58b59af1
Parents: 6afe6f3
Author: Arun Mahadevan 
Authored: Tue Aug 7 10:28:26 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Aug 7 10:28:26 2018 +0800

--
 .../spark/sql/sources/v2/CustomMetrics.java | 33 ++
 .../streaming/SupportsCustomReaderMetrics.java  | 47 +++
 .../streaming/SupportsCustomWriterMetrics.java  | 47 +++
 .../execution/streaming/ProgressReporter.scala  | 63 ++--
 .../streaming/sources/MicroBatchWriter.scala|  2 +-
 .../execution/streaming/sources/memoryV2.scala  | 32 --
 .../apache/spark/sql/streaming/progress.scala   | 46 --
 .../execution/streaming/MemorySinkV2Suite.scala | 22 +++
 .../sql/streaming/StreamingQuerySuite.scala | 28 +
 9 files changed, 306 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/18b6ec14/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
new file mode 100644
index 000..7011a70
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface for reporting custom metrics from streaming sources and sinks
+ */
+@InterfaceStability.Evolving
+public interface CustomMetrics {
+  /**
+   * Returns a JSON serialized representation of custom metrics
+   *
+   * @return JSON serialized representation of custom metrics
+   */
+  String json();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/18b6ec14/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
new file mode 100644
index 000..3b293d9
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain 

spark git commit: [SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics

2018-08-06 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 1076e4f00 -> 6afe6f32c


[SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard 
metrics

## What changes were proposed in this pull request?

The patch adds metrics regarding state and watermark to dropwizard metrics, so 
that watermark and state rows/size can be tracked via time-series manner.

## How was this patch tested?

Manually tested with CSV metric sink.

Closes #21622 from HeartSaVioR/SPARK-24637.

Authored-by: Jungtaek Lim 
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6afe6f32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6afe6f32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6afe6f32

Branch: refs/heads/master
Commit: 6afe6f32ca2880b13bb5fb4397b2058eef12952b
Parents: 1076e4f
Author: Jungtaek Lim 
Authored: Tue Aug 7 10:12:22 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Aug 7 10:12:22 2018 +0800

--
 .../execution/streaming/MetricsReporter.scala   | 20 
 .../sql/streaming/StreamingQuerySuite.scala |  3 +++
 2 files changed, 23 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 66b11ec..8709822 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.text.SimpleDateFormat
+
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.streaming.StreamingQueryProgress
 
 /**
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 
0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+progress => 
convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
+
+  registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 
0L)
+  registerGauge("states-usedBytes", 
_.stateOperators.map(_.memoryUsedBytes).sum, 0L)
+
+  private def convertStringDateToMillis(isoUtcDateStr: String) = {
+if (isoUtcDateStr != null) {
+  timestampFormat.parse(isoUtcDateStr).getTime
+} else {
+  0L
+}
+  }
+
   private def registerGauge[T](
   name: String,
   f: StreamingQueryProgress => T,

http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index f37f368..9cceec9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -467,6 +467,9 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0)
 
assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0)
 assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 
0.0)
+assert(gauges.get("eventTime-watermark").getValue.asInstanceOf[Long] 
== 0)
+assert(gauges.get("states-rowsTotal").getValue.asInstanceOf[Long] == 0)
+assert(gauges.get("states-usedBytes").getValue.asInstanceOf[Long] == 0)
 sq.stop()
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][DOCS] Fix grammatical error in SortShuffleManager

2018-08-06 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 0f3fa2f28 -> 1076e4f00


[MINOR][DOCS] Fix grammatical error in SortShuffleManager

## What changes were proposed in this pull request?

Fix a grammatical error in the comment of SortShuffleManager.

## How was this patch tested?

N/A

Closes #21956 from deshanxiao/master.

Authored-by: deshanxiao <42019462+deshanx...@users.noreply.github.com>
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1076e4f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1076e4f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1076e4f0

Branch: refs/heads/master
Commit: 1076e4f0026914804b5948ff0da0c84def1315cc
Parents: 0f3fa2f
Author: deshanxiao <42019462+deshanx...@users.noreply.github.com>
Authored: Tue Aug 7 09:36:37 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Aug 7 09:36:37 2018 +0800

--
 .../scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1076e4f0/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index d9fad64..0caf84c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -27,7 +27,7 @@ import org.apache.spark.shuffle._
  * In sort-based shuffle, incoming records are sorted according to their 
target partition ids, then
  * written to a single map output file. Reducers fetch contiguous regions of 
this file in order to
  * read their portion of the map output. In cases where the map output data is 
too large to fit in
- * memory, sorted subsets of the output can are spilled to disk and those 
on-disk files are merged
+ * memory, sorted subsets of the output can be spilled to disk and those 
on-disk files are merged
  * to produce the final output file.
  *
  * Sort-based shuffle has two different write paths for producing its map 
output files:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate

2018-08-06 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 408a3ff2c -> 0f3fa2f28


[SPARK-24996][SQL] Use DSL in DeclarativeAggregate

## What changes were proposed in this pull request?

The PR refactors the aggregate expressions which were not using DSL in order to 
simplify them.

## How was this patch tested?

NA

Author: Marco Gaido 

Closes #21970 from mgaido91/SPARK-24996.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f3fa2f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f3fa2f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f3fa2f2

Branch: refs/heads/master
Commit: 0f3fa2f289f53a8ceea3b0a52fa6dc319001b10b
Parents: 408a3ff
Author: Marco Gaido 
Authored: Mon Aug 6 19:46:51 2018 -0400
Committer: Xiao Li 
Committed: Mon Aug 6 19:46:51 2018 -0400

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +
 .../expressions/aggregate/Average.scala |  2 +-
 .../aggregate/CentralMomentAgg.scala| 40 +---
 .../catalyst/expressions/aggregate/Corr.scala   | 13 +++
 .../expressions/aggregate/Covariance.scala  | 16 
 .../catalyst/expressions/aggregate/First.scala  |  7 ++--
 .../catalyst/expressions/aggregate/Last.scala   |  7 ++--
 .../catalyst/expressions/aggregate/Max.scala|  5 ++-
 .../catalyst/expressions/aggregate/Min.scala|  5 ++-
 .../catalyst/expressions/aggregate/Sum.scala|  7 ++--
 .../expressions/windowExpressions.scala | 30 +++
 11 files changed, 65 insertions(+), 69 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f3fa2f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 75387fa..2b582b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -167,6 +167,8 @@ package object dsl {
 def upper(e: Expression): Expression = Upper(e)
 def lower(e: Expression): Expression = Lower(e)
 def coalesce(args: Expression*): Expression = Coalesce(args)
+def greatest(args: Expression*): Expression = Greatest(args)
+def least(args: Expression*): Expression = Least(args)
 def sqrt(e: Expression): Expression = Sqrt(e)
 def abs(e: Expression): Expression = Abs(e)
 def star(names: String*): Expression = names match {

http://git-wip-us.apache.org/repos/asf/spark/blob/0f3fa2f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
index f1fad77..5ecb77b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala
@@ -68,7 +68,7 @@ abstract class AverageLike(child: Expression) extends 
DeclarativeAggregate {
 Add(
   sum,
   coalesce(child.cast(sumDataType), Literal(0).cast(sumDataType))),
-/* count = */ If(IsNull(child), count, count + 1L)
+/* count = */ If(child.isNull, count, count + 1L)
   )
 
   override lazy val updateExpressions = updateExpressionsDef

http://git-wip-us.apache.org/repos/asf/spark/blob/0f3fa2f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
index 6bbb083..e2ff0ef 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
@@ -75,7 +75,7 @@ abstract class CentralMomentAgg(child: Expression)
 val n2 = n.right
 val newN = n1 + n2
 val delta = avg.right - avg.left
-val deltaN = If(newN === Literal(0.0), Literal(0.0), delta / newN)
+val deltaN = If(newN === 0.0, 0.0, delta / newN)
 val newAvg = avg.left + deltaN * n2
 
 // higher order moments computed according to:
@@ -102,7 +102,7 @@ abstract class CentralMomentAgg(child: Expression)
   }
 
   prot

spark git commit: [SPARK-25036][SQL] Should compare ExprValue.isNull with LiteralTrue/LiteralFalse

2018-08-06 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 87ca7396c -> 408a3ff2c


[SPARK-25036][SQL] Should compare ExprValue.isNull with LiteralTrue/LiteralFalse

## What changes were proposed in this pull request?

This PR fixes a comparison of `ExprValue.isNull` with `String`. 
`ExprValue.isNull` should be compared with `LiteralTrue` or `LiteralFalse`.

This causes the following compilation error using scala-2.12 with sbt. In 
addition, this code may also generate incorrect code in Spark 2.3.

```
/home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:94:
 org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are 
unrelated: they will most likely always compare unequal
[error] [warn] if (eval.isNull != "true") {
[error] [warn]
[error] [warn] 
/home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:126:
 org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are 
unrelated: they will most likely never compare equal
[error] [warn]  if (eval.isNull == "true") {
[error] [warn]
[error] [warn] 
/home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:133:
 org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are 
unrelated: they will most likely never compare equal
[error] [warn] if (eval.isNull == "true") {
[error] [warn]
[error] [warn] 
/home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala:90:
 org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are 
unrelated: they will most likely never compare equal
[error] [warn]   if (inputs.map(_.isNull).forall(_ == "false")) {
[error] [warn]
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki 

Closes #22012 from kiszk/SPARK-25036a.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/408a3ff2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/408a3ff2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/408a3ff2

Branch: refs/heads/master
Commit: 408a3ff2c484fba5734c03dbc570b654dcbc1f23
Parents: 87ca739
Author: Kazuaki Ishizaki 
Authored: Mon Aug 6 19:43:21 2018 -0400
Committer: Xiao Li 
Committed: Mon Aug 6 19:43:21 2018 -0400

--
 .../expressions/codegen/GenerateUnsafeProjection.scala | 2 +-
 .../spark/sql/catalyst/expressions/stringExpressions.scala | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/408a3ff2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index 8f2a5a0..998a675 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -87,7 +87,7 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
   // For top level row writer, it always writes to the beginning of the 
global buffer holder,
   // which means its fixed-size region always in the same position, so we 
don't need to call
   // `reset` to set up its fixed-size region every time.
-  if (inputs.map(_.isNull).forall(_ == "false")) {
+  if (inputs.map(_.isNull).forall(_ == FalseLiteral)) {
 // If all fields are not nullable, which means the null bits never 
changes, then we don't
 // need to clear it out every time.
 ""

http://git-wip-us.apache.org/repos/asf/spark/blob/408a3ff2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 1838b9f..e1549d3 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -91,7 +91,7 @@ case class ConcatWs(children: Seq[Expression])
   val args = ctx.fres

svn commit: r28584 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_16_02-87ca739-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-06 Thread pwendell
Author: pwendell
Date: Mon Aug  6 23:16:08 2018
New Revision: 28584

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_06_16_02-87ca739 docs


[This commit notification would consist of 1470 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24161][SS] Enable debug package feature on structured streaming

2018-08-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 3c96937c7 -> 87ca7396c


[SPARK-24161][SS] Enable debug package feature on structured streaming

## What changes were proposed in this pull request?

Currently, debug package has a implicit class "DebugQuery" which matches 
Dataset to provide debug features on Dataset class. It doesn't work with 
structured streaming: it requires query is already started, and the information 
can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" 
had to be placed to StreamingQuery whereas it already exists on Dataset.

This patch adds a new implicit class "DebugStreamQuery" which matches 
StreamingQuery to provide similar debug features on StreamingQuery class.

## How was this patch tested?

Added relevant unit tests.

Author: Jungtaek Lim 

Closes #21222 from HeartSaVioR/SPARK-24161.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87ca7396
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87ca7396
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87ca7396

Branch: refs/heads/master
Commit: 87ca7396c7b21a87874d8ceb32e53119c609002c
Parents: 3c96937
Author: Jungtaek Lim 
Authored: Mon Aug 6 15:23:47 2018 -0700
Committer: Shixiong Zhu 
Committed: Mon Aug 6 15:23:47 2018 -0700

--
 .../spark/sql/execution/debug/package.scala |  59 +-
 .../spark/sql/streaming/StreamSuite.scala   | 116 +++
 2 files changed, 173 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/87ca7396/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index a717cbd..366e1fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -29,6 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, 
CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.execution.streaming.{StreamExecution, 
StreamingQueryWrapper}
+import 
org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec
+import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
 
 /**
@@ -40,6 +43,16 @@ import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
  *   sql("SELECT 1").debug()
  *   sql("SELECT 1").debugCodegen()
  * }}}
+ *
+ * or for streaming case (structured streaming):
+ * {{{
+ *   import org.apache.spark.sql.execution.debug._
+ *   val query = df.writeStream.<...>.start()
+ *   query.debugCodegen()
+ * }}}
+ *
+ * Note that debug in structured streaming is not supported, because it 
doesn't make sense for
+ * streaming to execute batch once while main query is running concurrently.
  */
 package object debug {
 
@@ -89,13 +102,49 @@ package object debug {
   }
 
   /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan into 
one String
+   *
+   * @param query the streaming query for codegen
+   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
+   */
+  def codegenString(query: StreamingQuery): String = {
+val w = asStreamExecution(query)
+if (w.lastExecution != null) {
+  codegenString(w.lastExecution.executedPlan)
+} else {
+  "No physical plan. Waiting for data."
+}
+  }
+
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan
+   *
+   * @param query the streaming query for codegen
+   * @return Sequence of WholeStageCodegen subtrees and corresponding codegen
+   */
+  def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
+val w = asStreamExecution(query)
+if (w.lastExecution != null) {
+  codegenStringSeq(w.lastExecution.executedPlan)
+} else {
+  Seq.empty
+}
+  }
+
+  private def asStreamExecution(query: StreamingQuery): StreamExecution = 
query match {
+case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
+case q: StreamExecution => q
+case _ => throw new IllegalArgumentException("Parameter should be an 
instance of " +
+  "StreamExecution!")
+  }
+
+  /**
* Augments [[Dataset]]s with debug methods.
*/
   implicit class DebugQuery(query: Dataset[_]) extends Logging {
 def debug(): Unit = {
-  val plan = query.queryExecution.executedPlan
   val visited = new col

spark git commit: [SPARK-24948][SHS] Delegate check access permissions to the file system

2018-08-06 Thread mridulm80
Repository: spark
Updated Branches:
  refs/heads/master 278984d5a -> 3c96937c7


[SPARK-24948][SHS] Delegate check access permissions to the file system

## What changes were proposed in this pull request?

In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic 
permissions in order to check wether a user can access a file or not. This is 
not a complete check, as it ignores ACLs and other policies a file system may 
apply in its internal. So this can result in returning wrongly that a user 
cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is 
accessible or not, in order to return the right result. A caching layer is 
added for performance reasons.

## How was this patch tested?

modified UTs

Author: Marco Gaido 

Closes #21895 from mgaido91/SPARK-24948.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c96937c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c96937c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c96937c

Branch: refs/heads/master
Commit: 3c96937c7b1d7a010b630f4b98fd22dafc37808b
Parents: 278984d
Author: Marco Gaido 
Authored: Mon Aug 6 14:29:05 2018 -0700
Committer: Mridul Muralidharan 
Committed: Mon Aug 6 14:29:05 2018 -0700

--
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 23 -
 .../deploy/history/FsHistoryProvider.scala  | 67 ++
 .../spark/deploy/SparkHadoopUtilSuite.scala | 97 
 .../deploy/history/FsHistoryProviderSuite.scala | 42 -
 4 files changed, 89 insertions(+), 140 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 8353e64..70a8c65 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -31,7 +31,6 @@ import scala.util.control.NonFatal
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
-import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -367,28 +366,6 @@ class SparkHadoopUtil extends Logging {
 buffer.toString
   }
 
-  private[spark] def checkAccessPermission(status: FileStatus, mode: 
FsAction): Boolean = {
-val perm = status.getPermission
-val ugi = UserGroupInformation.getCurrentUser
-
-if (ugi.getShortUserName == status.getOwner) {
-  if (perm.getUserAction.implies(mode)) {
-return true
-  }
-} else if (ugi.getGroupNames.contains(status.getGroup)) {
-  if (perm.getGroupAction.implies(mode)) {
-return true
-  }
-} else if (perm.getOtherAction.implies(mode)) {
-  return true
-}
-
-logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
-  s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
-  s"${if (status.isDirectory) "d" else "-"}$perm")
-false
-  }
-
   def serialize(creds: Credentials): Array[Byte] = {
 val byteStream = new ByteArrayOutputStream
 val dataStream = new DataOutputStream(byteStream)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bf1eeb0..44d2390 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -21,11 +21,12 @@ import java.io.{File, FileNotFoundException, IOException}
 import java.nio.file.Files
 import java.nio.file.attribute.PosixFilePermissions
 import java.util.{Date, ServiceLoader}
-import java.util.concurrent.{ExecutorService, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, 
TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.concurrent.ExecutionException
 import scala.io.Source
 import scala.util.Try
 import scala.xml.Node
@@ -33,8 +34,7 @@ import scala.xml.Node
 import com.fasterxml.jackson.annotation.JsonI

svn commit: r28578 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_12_01-278984d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-06 Thread pwendell
Author: pwendell
Date: Mon Aug  6 19:16:08 2018
New Revision: 28578

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_06_12_01-278984d docs


[This commit notification would consist of 1470 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25019][BUILD] Fix orc dependency to use the same exclusion rules

2018-08-06 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 51e2b38d9 -> 278984d5a


[SPARK-25019][BUILD] Fix orc dependency to use the same exclusion rules

## What changes were proposed in this pull request?

During upgrading Apache ORC to 1.5.2 
([SPARK-24576](https://issues.apache.org/jira/browse/SPARK-24576)), `sql/core` 
module overrides the exclusion rules of parent pom file and it causes published 
`spark-sql_2.1X` artifacts have incomplete exclusion rules 
([SPARK-25019](https://issues.apache.org/jira/browse/SPARK-25019)). This PR 
fixes it by moving the newly added exclusion rule to the parent pom. This also 
fixes the sbt build hack introduced at that time.

## How was this patch tested?

Pass the existing dependency check and the tests.

Author: Dongjoon Hyun 

Closes #22003 from dongjoon-hyun/SPARK-25019.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/278984d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/278984d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/278984d5

Branch: refs/heads/master
Commit: 278984d5a5e56136c9f940f2d0e3d2040fad180b
Parents: 51e2b38
Author: Dongjoon Hyun 
Authored: Mon Aug 6 12:00:39 2018 -0700
Committer: Yin Huai 
Committed: Mon Aug 6 12:00:39 2018 -0700

--
 pom.xml  |  4 
 sql/core/pom.xml | 28 
 2 files changed, 4 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/278984d5/pom.xml
--
diff --git a/pom.xml b/pom.xml
index c46eb31..8abdb70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1744,6 +1744,10 @@
 hadoop-common
   
   
+org.apache.hadoop
+hadoop-hdfs
+  
+  
 org.apache.hive
 hive-storage-api
   

http://git-wip-us.apache.org/repos/asf/spark/blob/278984d5/sql/core/pom.xml
--
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 68b42a4..ba17f5f 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -90,39 +90,11 @@
   org.apache.orc
   orc-core
   ${orc.classifier}
-  
-
-  org.apache.hadoop
-  hadoop-hdfs
-
-
-
-  org.apache.hive
-  hive-storage-api
-
-  
 
 
   org.apache.orc
   orc-mapreduce
   ${orc.classifier}
-  
-
-  org.apache.hadoop
-  hadoop-hdfs
-
-
-
-  org.apache.hive
-  hive-storage-api
-
-  
 
 
   org.apache.parquet


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24992][CORE] spark should randomize yarn local dir selection

2018-08-06 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 1a5e46076 -> 51e2b38d9


[SPARK-24992][CORE] spark should randomize yarn local dir selection

**Description: 
[SPARK-24992](https://issues.apache.org/jira/browse/SPARK-24992)**
Utils.getLocalDir is used to get path of a temporary directory. However, it 
always returns the the same directory, which is the first element in the array 
localRootDirs. When running on YARN, this might causes the case that we always 
write to one disk, which makes it busy while other disks are free. We should 
randomize the selection to spread out the loads.

**What changes were proposed in this pull request?**
This PR randomized the selection of local directory inside the method 
Utils.getLocalDir. This change affects the Utils.fetchFile method since it 
based on the fact that Utils.getLocalDir always return the same directory to 
cache file. Therefore, a new variable cachedLocalDir is used to cache the first 
localDirectory that it gets from Utils.getLocalDir. Also, when getting the 
configured local directories (inside Utils. getConfiguredLocalDirs), in case we 
are in yarn mode, the array of directories are also randomized before return.

Author: Hieu Huynh <“hieu.hu...@oath.com”>

Closes #21953 from hthuynh2/SPARK_24992.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51e2b38d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51e2b38d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51e2b38d

Branch: refs/heads/master
Commit: 51e2b38d93df8cb0cc151d5e68a2190eab52644c
Parents: 1a5e460
Author: Hieu Huynh <“hieu.hu...@oath.com”>
Authored: Mon Aug 6 13:58:28 2018 -0500
Committer: Thomas Graves 
Committed: Mon Aug 6 13:58:28 2018 -0500

--
 .../scala/org/apache/spark/util/Utils.scala | 21 
 1 file changed, 17 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51e2b38d/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a6fd363..7ec707d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -83,6 +83,7 @@ private[spark] object Utils extends Logging {
   val random = new Random()
 
   private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
+  @volatile private var cachedLocalDir: String = ""
 
   /**
* Define a default value for driver memory here since this value is 
referenced across the code
@@ -462,7 +463,15 @@ private[spark] object Utils extends Logging {
 if (useCache && fetchCacheEnabled) {
   val cachedFileName = s"${url.hashCode}${timestamp}_cache"
   val lockFileName = s"${url.hashCode}${timestamp}_lock"
-  val localDir = new File(getLocalDir(conf))
+  // Set the cachedLocalDir for the first time and re-use it later
+  if (cachedLocalDir.isEmpty) {
+this.synchronized {
+  if (cachedLocalDir.isEmpty) {
+cachedLocalDir = getLocalDir(conf)
+  }
+}
+  }
+  val localDir = new File(cachedLocalDir)
   val lockFile = new File(localDir, lockFileName)
   val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
   // Only one executor entry.
@@ -767,13 +776,17 @@ private[spark] object Utils extends Logging {
*   - Otherwise, this will return java.io.tmpdir.
*
* Some of these configuration options might be lists of multiple paths, but 
this method will
-   * always return a single directory.
+   * always return a single directory. The return directory is chosen randomly 
from the array
+   * of directories it gets from getOrCreateLocalRootDirs.
*/
   def getLocalDir(conf: SparkConf): String = {
-getOrCreateLocalRootDirs(conf).headOption.getOrElse {
+val localRootDirs = getOrCreateLocalRootDirs(conf)
+if (localRootDirs.isEmpty) {
   val configuredLocalDirs = getConfiguredLocalDirs(conf)
   throw new IOException(
 s"Failed to get a temp directory under 
[${configuredLocalDirs.mkString(",")}].")
+} else {
+  localRootDirs(scala.util.Random.nextInt(localRootDirs.length))
 }
   }
 
@@ -815,7 +828,7 @@ private[spark] object Utils extends Logging {
   // to what Yarn on this system said was available. Note this assumes 
that Yarn has
   // created the directories already, and that they are secured so that 
only the
   // user has access to them.
-  getYarnLocalDirs(conf).split(",")
+  randomizeInPlace(getYarnLocalDirs(conf).split(","))
 } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
   conf.getenv("SPARK_EXECUTOR

svn commit: r28575 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_08_01-1a5e460-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-06 Thread pwendell
Author: pwendell
Date: Mon Aug  6 15:16:12 2018
New Revision: 28575

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_06_08_01-1a5e460 docs


[This commit notification would consist of 1470 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23913][SQL] Add array_intersect function

2018-08-06 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master 35700bb7f -> 1a5e46076


[SPARK-23913][SQL] Add array_intersect function

## What changes were proposed in this pull request?

The PR adds the SQL function `array_intersect`. The behavior of the function is 
based on Presto's one.

This function returns returns an array of the elements in the intersection of 
array1 and array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki 

Closes #21102 from kiszk/SPARK-23913.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a5e4607
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a5e4607
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a5e4607

Branch: refs/heads/master
Commit: 1a5e460762593c61b7ff2c5f3641d406706616ff
Parents: 35700bb
Author: Kazuaki Ishizaki 
Authored: Mon Aug 6 23:27:57 2018 +0900
Committer: Takuya UESHIN 
Committed: Mon Aug 6 23:27:57 2018 +0900

--
 python/pyspark/sql/functions.py |  19 +
 .../catalyst/analysis/FunctionRegistry.scala|   1 +
 .../expressions/collectionOperations.scala  | 386 +++
 .../CollectionExpressionsSuite.scala| 112 ++
 .../scala/org/apache/spark/sql/functions.scala  |  11 +
 .../spark/sql/DataFrameFunctionsSuite.scala |  54 +++
 6 files changed, 515 insertions(+), 68 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a5e4607/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index ec014a5..eaecf28 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2035,6 +2035,25 @@ def array_distinct(col):
 
 @ignore_unicode_prefix
 @since(2.4)
+def array_intersect(col1, col2):
+"""
+Collection function: returns an array of the elements in the intersection 
of col1 and col2,
+without duplicates.
+
+:param col1: name of column containing array
+:param col2: name of column containing array
+
+>>> from pyspark.sql import Row
+>>> df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", 
"f"])])
+>>> df.select(array_intersect(df.c1, df.c2)).collect()
+[Row(array_intersect(c1, c2)=[u'a', u'c'])]
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.array_intersect(_to_java_column(col1), 
_to_java_column(col2)))
+
+
+@ignore_unicode_prefix
+@since(2.4)
 def array_union(col1, col2):
 """
 Collection function: returns an array of the elements in the union of col1 
and col2,

http://git-wip-us.apache.org/repos/asf/spark/blob/1a5e4607/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 35f8de1..ed2f67d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -411,6 +411,7 @@ object FunctionRegistry {
 expression[CreateArray]("array"),
 expression[ArrayContains]("array_contains"),
 expression[ArraysOverlap]("arrays_overlap"),
+expression[ArrayIntersect]("array_intersect"),
 expression[ArrayJoin]("array_join"),
 expression[ArrayPosition]("array_position"),
 expression[ArraySort]("array_sort"),

http://git-wip-us.apache.org/repos/asf/spark/blob/1a5e4607/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 3f94f25..e385c2d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -3651,7 +3651,7 @@ case class ArrayDistinct(child: Expression)
 }
 
 /**
- * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
[[ArrayExcept]].
+ * Will become common base class for [[ArrayUnion]], [[ArrayIntersect]], and 
[[ArrayExcept]].
  */
 abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
   override def checkInputDataTypes(): TypeCheckResult = {
@@ -3672,6 +3672,75 @@ abstract c

spark git commit: [SPARK-24981][CORE] ShutdownHook timeout causes job to fail when succeeded when SparkContext stop() not called by user program

2018-08-06 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master c1760da5d -> 35700bb7f


[SPARK-24981][CORE] ShutdownHook timeout causes job to fail when succeeded when 
SparkContext stop() not called by user program

**Description**
The issue is described in 
[SPARK-24981](https://issues.apache.org/jira/browse/SPARK-24981).

**How does this PR fix the issue?**
This PR catch the Exception that is thrown while the Sparkcontext.stop() is 
running (when it is called by the ShutdownHookManager).

**How was this patch tested?**
I manually tested it by adding delay (60s) inside the stop(). This make the 
shutdownHookManger interrupt the thread that is running stop(). The Interrupted 
Exception was catched and the job succeed.

Author: Hieu Huynh <“hieu.hu...@oath.com”>
Author: Hieu Tri Huynh 

Closes #21936 from hthuynh2/SPARK_24981.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35700bb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35700bb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35700bb7

Branch: refs/heads/master
Commit: 35700bb7f2e3008ff781a1b3a1da8147d26371be
Parents: c1760da
Author: Hieu Huynh <“hieu.hu...@oath.com”>
Authored: Mon Aug 6 09:01:51 2018 -0500
Committer: Thomas Graves 
Committed: Mon Aug 6 09:01:51 2018 -0500

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/35700bb7/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 03e91cd..e8bacee 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -571,7 +571,12 @@ class SparkContext(config: SparkConf) extends Logging {
 _shutdownHookRef = ShutdownHookManager.addShutdownHook(
   ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
   logInfo("Invoking stop() from shutdown hook")
-  stop()
+  try {
+stop()
+  } catch {
+case e: Throwable =>
+  logWarning("Ignoring Exception while stopping SparkContext from 
shutdown hook", e)
+  }
 }
   } catch {
 case NonFatal(e) =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r28572 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_06_39-c1760da-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-08-06 Thread pwendell
Author: pwendell
Date: Mon Aug  6 13:59:00 2018
New Revision: 28572

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_08_06_06_39-c1760da docs


[This commit notification would consist of 1470 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25025][SQL] Remove the default value of isAll in INTERSECT/EXCEPT

2018-08-06 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master d063e3a47 -> c1760da5d


[SPARK-25025][SQL] Remove the default value of isAll in INTERSECT/EXCEPT

## What changes were proposed in this pull request?

Having the default value of isAll in the logical plan nodes INTERSECT/EXCEPT 
could introduce bugs when the callers are not aware of it. This PR removes the 
default value and makes caller explicitly specify them.

## How was this patch tested?
This is a refactoring change. Existing tests test the functionality already.

Author: Dilip Biswal 

Closes #22000 from dilipbiswal/SPARK-25025.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1760da5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1760da5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1760da5

Branch: refs/heads/master
Commit: c1760da5dd5576c52be4f9dd9ecd06589a6153e4
Parents: d063e3a
Author: Dilip Biswal 
Authored: Mon Aug 6 06:56:36 2018 -0400
Committer: Xiao Li 
Committed: Mon Aug 6 06:56:36 2018 -0400

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  4 ++--
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  6 ++---
 .../plans/logical/basicLogicalOperators.scala   |  4 ++--
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 12 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  6 ++---
 .../catalyst/analysis/TypeCoercionSuite.scala   | 24 +---
 .../analysis/UnsupportedOperationsSuite.scala   |  4 ++--
 .../catalyst/optimizer/ColumnPruningSuite.scala |  4 ++--
 .../optimizer/ReplaceOperatorSuite.scala| 16 ++---
 .../sql/catalyst/parser/PlanParserSuite.scala   | 21 +
 .../plans/ConstraintPropagationSuite.scala  |  4 ++--
 .../scala/org/apache/spark/sql/Dataset.scala|  4 ++--
 12 files changed, 60 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c1760da5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 7997e79..75387fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -356,10 +356,10 @@ package object dsl {
 
   def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, 
logicalPlan)
 
-  def except(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
+  def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
 Except(logicalPlan, otherPlan, isAll)
 
-  def intersect(otherPlan: LogicalPlan, isAll: Boolean = false): 
LogicalPlan =
+  def intersect(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
 Intersect(logicalPlan, otherPlan, isAll)
 
   def union(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, 
otherPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/c1760da5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 9906a30..732d762 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -534,15 +534,15 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
   case SqlBaseParser.INTERSECT if all =>
 Intersect(left, right, isAll = true)
   case SqlBaseParser.INTERSECT =>
-Intersect(left, right)
+Intersect(left, right, isAll = false)
   case SqlBaseParser.EXCEPT if all =>
 Except(left, right, isAll = true)
   case SqlBaseParser.EXCEPT =>
-Except(left, right)
+Except(left, right, isAll = false)
   case SqlBaseParser.SETMINUS if all =>
 Except(left, right, isAll = true)
   case SqlBaseParser.SETMINUS =>
-Except(left, right)
+Except(left, right, isAll = false)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1760da5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOpera

spark git commit: [SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesceHints

2018-08-06 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 64ad7b841 -> d063e3a47


[SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesceHints

## What changes were proposed in this pull request?

Follow up to fix an unmerged review comment.

## How was this patch tested?

Unit test ResolveHintsSuite.

Author: John Zhuge 

Closes #21998 from jzhuge/SPARK-24940.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d063e3a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d063e3a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d063e3a4

Branch: refs/heads/master
Commit: d063e3a478221c836a0aa74a69828a526a6207bb
Parents: 64ad7b8
Author: John Zhuge 
Authored: Mon Aug 6 06:41:55 2018 -0400
Committer: Xiao Li 
Committed: Mon Aug 6 06:41:55 2018 -0400

--
 .../org/apache/spark/sql/catalyst/analysis/ResolveHints.scala   | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d063e3a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 1ef482b..80d5105 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -20,12 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.IntegerType
 
 
 /**
@@ -119,7 +118,7 @@ object ResolveHints {
   case "COALESCE" => false
 }
 val numPartitions = h.parameters match {
-  case Seq(Literal(numPartitions: Int, IntegerType)) =>
+  case Seq(IntegerLiteral(numPartitions)) =>
 numPartitions
   case Seq(numPartitions: Int) =>
 numPartitions


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23772][FOLLOW-UP][SQL] Provide an option to ignore column of all null values or empty array during JSON schema inference

2018-08-06 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master ac527b520 -> 64ad7b841


[SPARK-23772][FOLLOW-UP][SQL] Provide an option to ignore column of all null 
values or empty array during JSON schema inference

## What changes were proposed in this pull request?

The `dropFieldIfAllNull` parameter of the `json` method wasn't set as an 
option. This PR fixes that.

## How was this patch tested?

I added a test to `sql/test.py`

Author: Maxim Gekk 

Closes #22002 from MaxGekk/drop-field-if-all-null.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64ad7b84
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64ad7b84
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64ad7b84

Branch: refs/heads/master
Commit: 64ad7b841d1efa979041358ee2a19aea7382d737
Parents: ac527b52
Author: Maxim Gekk 
Authored: Mon Aug 6 16:46:55 2018 +0800
Committer: hyukjinkwon 
Committed: Mon Aug 6 16:46:55 2018 +0800

--
 python/pyspark/sql/readwriter.py |  2 +-
 python/pyspark/sql/tests.py  | 16 
 2 files changed, 17 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/64ad7b84/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 98b2cd9..abf878a 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -267,7 +267,7 @@ class DataFrameReader(OptionUtils):
 mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
 timestampFormat=timestampFormat, multiLine=multiLine,
 allowUnquotedControlChars=allowUnquotedControlChars, 
lineSep=lineSep,
-samplingRatio=samplingRatio, encoding=encoding)
+samplingRatio=samplingRatio, 
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding)
 if isinstance(path, basestring):
 path = [path]
 if type(path) == list:

http://git-wip-us.apache.org/repos/asf/spark/blob/64ad7b84/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a294d70..ed97a63 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3351,6 +3351,22 @@ class SQLTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
+def test_ignore_column_of_all_nulls(self):
+path = tempfile.mkdtemp()
+shutil.rmtree(path)
+try:
+df = self.spark.createDataFrame([["""{"a":null, "b":1, 
"c":3.0}"""],
+ ["""{"a":null, "b":null, 
"c":"string"}"""],
+ ["""{"a":null, "b":null, 
"c":null}"""]])
+df.write.text(path)
+schema = StructType([
+StructField('b', LongType(), nullable=True),
+StructField('c', StringType(), nullable=True)])
+readback = self.spark.read.json(path, dropFieldIfAllNull=True)
+self.assertEquals(readback.schema, schema)
+finally:
+shutil.rmtree(path)
+
 def test_repr_behaviors(self):
 import re
 pattern = re.compile(r'^ *\|', re.MULTILINE)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24991][SQL] use InternalRow in DataSourceWriter

2018-08-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 327bb3007 -> ac527b520


[SPARK-24991][SQL] use InternalRow in DataSourceWriter

## What changes were proposed in this pull request?

A follow up of #21118

Since we use `InternalRow` in the read API of data source v2, we should do the 
same thing for the write API.

## How was this patch tested?

existing tests.

Author: Wenchen Fan 

Closes #21948 from cloud-fan/row-write.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac527b52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac527b52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac527b52

Branch: refs/heads/master
Commit: ac527b5205ec2826677e2b7ad0d424aa976bce81
Parents: 327bb30
Author: Wenchen Fan 
Authored: Mon Aug 6 15:52:01 2018 +0800
Committer: Wenchen Fan 
Committed: Mon Aug 6 15:52:01 2018 +0800

--
 .../spark/sql/kafka010/KafkaStreamWriter.scala  |  4 +-
 .../sql/sources/v2/writer/DataSourceWriter.java |  4 +-
 .../spark/sql/sources/v2/writer/DataWriter.java |  4 +-
 .../sources/v2/writer/DataWriterFactory.java|  5 +-
 .../v2/writer/SupportsWriteInternalRow.java | 41 ---
 .../datasources/v2/WriteToDataSourceV2.scala| 30 +---
 .../streaming/MicroBatchExecution.scala | 10 +--
 .../continuous/ContinuousWriteRDD.scala |  6 +-
 .../WriteToContinuousDataSourceExec.scala   | 12 +---
 .../streaming/sources/ConsoleWriter.scala   | 11 ++-
 .../sources/ForeachWriterProvider.scala | 10 +--
 .../streaming/sources/MicroBatchWriter.scala| 21 +-
 .../sources/PackedRowWriterFactory.scala| 15 ++--
 .../execution/streaming/sources/memoryV2.scala  | 33 +
 .../execution/streaming/MemorySinkV2Suite.scala | 18 +++--
 .../sql/sources/v2/DataSourceV2Suite.scala  |  7 --
 .../sources/v2/SimpleWritableDataSource.scala   | 72 ++--
 17 files changed, 73 insertions(+), 230 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ac527b52/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
index 32923dc..5f0802b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -42,11 +42,11 @@ case object KafkaWriterCommitMessage extends 
WriterCommitMessage
  */
 class KafkaStreamWriter(
 topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
-  extends StreamWriter with SupportsWriteInternalRow {
+  extends StreamWriter {
 
   validateQuery(schema.toAttributes, producerParams.toMap[String, 
Object].asJava, topic)
 
-  override def createInternalRowWriterFactory(): KafkaStreamWriterFactory =
+  override def createWriterFactory(): KafkaStreamWriterFactory =
 KafkaStreamWriterFactory(topic, producerParams, schema)
 
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}

http://git-wip-us.apache.org/repos/asf/spark/blob/ac527b52/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
index 7eedc85..385fc29 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.sources.v2.writer;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.StreamWriteSupport;
 import org.apache.spark.sql.sources.v2.WriteSupport;
@@ -61,7 +61,7 @@ public interface DataSourceWriter {
* If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
* submitted.
*/
-  DataWriterFactory createWriterFactory();
+  DataWriterFactory createWriterFactory();
 
   /**
* Returns whether Spark should use the commit coordinator to ensure that at 
most one task for

http://git-wip-us.apache.org/repos/asf/spark/blob/ac527b52/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/