spark git commit: [SPARK-25010][SQL] Rand/Randn should produce different values for each execution in streaming query
Repository: spark Updated Branches: refs/heads/master 4446a0b0d -> 43763629f [SPARK-25010][SQL] Rand/Randn should produce different values for each execution in streaming query ## What changes were proposed in this pull request? Like Uuid in SPARK-24896, Rand and Randn expressions now produce the same results for each execution in streaming query. It doesn't make too much sense for streaming queries. We should make them produce different results as Uuid. In this change, similar to Uuid, we assign new random seeds to Rand/Randn when returning optimized plan from `IncrementalExecution`. Note: Different to Uuid, Rand/Randn can be created with initial seed. Because we replace this initial seed at `IncrementalExecution`, it doesn't use the initial seed anymore. For now it seems to me not a big issue for streaming query. But need to confirm with others. cc zsxwing cloud-fan ## How was this patch tested? Added test. Closes #21980 from viirya/SPARK-25010. Authored-by: Liang-Chi Hsieh Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43763629 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43763629 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43763629 Branch: refs/heads/master Commit: 43763629f1d1a220cd91e2aed89152d065dfba24 Parents: 4446a0b Author: Liang-Chi Hsieh Authored: Tue Aug 7 14:28:14 2018 +0800 Committer: Wenchen Fan Committed: Tue Aug 7 14:28:14 2018 +0800 -- .../spark/sql/catalyst/expressions/misc.scala | 5 - .../expressions/randomExpressions.scala | 16 -- .../streaming/IncrementalExecution.scala| 10 +++-- .../sql/streaming/StreamingQuerySuite.scala | 22 +++- 4 files changed, 42 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/43763629/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 5d98dac..0cdeda9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -126,10 +126,13 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { """, note = "The function is non-deterministic.") // scalastyle:on line.size.limit -case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with Stateful { +case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with Stateful +with ExpressionWithRandomSeed { def this() = this(None) + override def withNewSeed(seed: Long): Uuid = Uuid(Some(seed)) + override lazy val resolved: Boolean = randomSeed.isDefined override def nullable: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/43763629/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala index 926c2f0..b70c341 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala @@ -57,6 +57,14 @@ abstract class RDG extends UnaryExpression with ExpectsInputTypes with Stateful override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(IntegerType, LongType)) } +/** + * Represents the behavior of expressions which have a random seed and can renew the seed. + * Usually the random seed needs to be renewed at each execution under streaming queries. + */ +trait ExpressionWithRandomSeed { + def withNewSeed(seed: Long): Expression +} + /** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -72,10 +80,12 @@ abstract class RDG extends UnaryExpression with ExpectsInputTypes with Stateful """, note = "The function is non-deterministic in general case.") // scalastyle:on line.size.limit -case class Rand(child: Expression) extends RDG { +case class Rand(child: Expression) extends RDG with ExpressionWithRandomSeed { def this() = this(Literal(Utils.random.nextLong(), LongType)) + override def withNewSeed(seed: Long): Rand = Rand(Literal(seed, LongType)) + override protected def evalInternal(
svn commit: r28585 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_20_02-51bee7a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 7 03:16:00 2018 New Revision: 28585 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_06_20_02-51bee7a docs [This commit notification would consist of 1473 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23914][SQL][FOLLOW-UP] refactor ArrayUnion
Repository: spark Updated Branches: refs/heads/master 51bee7aca -> 4446a0b0d [SPARK-23914][SQL][FOLLOW-UP] refactor ArrayUnion ## What changes were proposed in this pull request? This PR refactors `ArrayUnion` based on [this suggestion](https://github.com/apache/spark/pull/21103#discussion_r205668821). 1. Generate optimized code for all of the primitive types except `boolean` 1. Generate code using `ArrayBuilder` or `ArrayBuffer` 1. Leave only a generic path in the interpreted path ## How was this patch tested? Existing tests Author: Kazuaki Ishizaki Closes #21937 from kiszk/SPARK-23914-follow. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4446a0b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4446a0b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4446a0b0 Branch: refs/heads/master Commit: 4446a0b0d9bd830f0e903d6780dedac4db572b5a Parents: 51bee7a Author: Kazuaki Ishizaki Authored: Tue Aug 7 12:07:56 2018 +0900 Committer: Takuya UESHIN Committed: Tue Aug 7 12:07:56 2018 +0900 -- .../expressions/collectionOperations.scala | 325 +++ .../CollectionExpressionsSuite.scala| 21 +- .../spark/sql/DataFrameFunctionsSuite.scala | 24 +- 3 files changed, 153 insertions(+), 217 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4446a0b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index e385c2d..fbb1826 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3767,230 +3767,159 @@ object ArraySetLike { """, since = "2.4.0") case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike -with ComplexTypeMergingExpression { - var hsInt: OpenHashSet[Int] = _ - var hsLong: OpenHashSet[Long] = _ - - def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { -val elem = array.getInt(idx) -if (!hsInt.contains(elem)) { - if (resultArray != null) { -resultArray.setInt(pos, elem) - } - hsInt.add(elem) - true -} else { - false -} - } - - def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { -val elem = array.getLong(idx) -if (!hsLong.contains(elem)) { - if (resultArray != null) { -resultArray.setLong(pos, elem) - } - hsLong.add(elem) - true -} else { - false -} - } + with ComplexTypeMergingExpression { - def evalIntLongPrimitiveType( - array1: ArrayData, - array2: ArrayData, - resultArray: ArrayData, - isLongType: Boolean): Int = { -// store elements into resultArray -var nullElementSize = 0 -var pos = 0 -Seq(array1, array2).foreach { array => - var i = 0 - while (i < array.numElements()) { -val size = if (!isLongType) hsInt.size else hsLong.size -if (size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - ArraySetLike.throwUnionLengthOverflowException(size) -} -if (array.isNullAt(i)) { - if (nullElementSize == 0) { -if (resultArray != null) { - resultArray.setNullAt(pos) + @transient lazy val evalUnion: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +val hs = new OpenHashSet[Any] +var foundNullElement = false +Seq(array1, array2).foreach { array => + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +arrayBuffer += null +foundNullElement = true + } +} else { + val elem = array.get(i, elementType) + if (!hs.contains(elem)) { +if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArraySetLike.throwUnionLengthOverflowException(arrayBuffer.size) +} +arrayBuffer += elem +hs.add(elem) + } } -pos += 1 -nullElementSize = 1 +i += 1 } -} else { - val assigned = if (!isLongType) { -assignInt(array
spark git commit: [SPARK-25018][INFRA] Use `Co-authored-by` and `Signed-off-by` git trailer in `merge_spark_pr.py`
Repository: spark Updated Branches: refs/heads/master 18b6ec147 -> 51bee7aca [SPARK-25018][INFRA] Use `Co-authored-by` and `Signed-off-by` git trailer in `merge_spark_pr.py` ## What changes were proposed in this pull request? In [Linux community](https://git.wiki.kernel.org/index.php/CommitMessageConventions), `Co-authored-by` and `Signed-off-by` git trailer have been used for awhile. Until recently, Github adopted `Co-authored-by` to include the work of co-authors in the profile contributions graph and the repository's statistics. It's a convention for recognizing multiple authors, and can encourage people to collaborate in OSS communities. Git provides a command line tools to read the metadata to know who commits the code to upstream, but it's not as easy as having `Signed-off-by` as part of the message so developers can find who is the relevant committers who can help with certain part of the codebase easier. For a single author PR, I purpose to use `Authored-by` and `Signed-off-by`, so the message will look like ``` Authored-by: Author's name Signed-off-by: Committer's name ``` For a multi-author PR, I purpose to use `Lead-authored-by:` and `Co-authored-by:` for the lead author and co-authors. The message will look like ``` Lead-authored-by: Lead Author's name Co-authored-by: CoAuthor's name Signed-off-by: Committer's name ``` It's also useful to include `Reviewed-by:` to give credits to the people who participate on the code reviewing. We can add this in the next iteration. Closes #21991 from dbtsai/script. Lead-authored-by: DB Tsai Co-authored-by: Liang-Chi Hsieh Co-authored-by: Brian Lindblom Co-authored-by: hyukjinkwon Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51bee7ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51bee7ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51bee7ac Branch: refs/heads/master Commit: 51bee7aca13451167fa3e701fcd60f023eae5e61 Parents: 18b6ec1 Author: DB Tsai Authored: Tue Aug 7 10:31:11 2018 +0800 Committer: hyukjinkwon Committed: Tue Aug 7 10:31:11 2018 +0800 -- dev/merge_spark_pr.py | 20 +++- 1 file changed, 15 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51bee7ac/dev/merge_spark_pr.py -- diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index fd3eeb0..7a6f7d2 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -142,6 +142,11 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc): distinct_authors[0]) if primary_author == "": primary_author = distinct_authors[0] +else: +# When primary author is specified manually, de-dup it from author list and +# put it at the head of author list. +distinct_authors = list(filter(lambda x: x != primary_author, distinct_authors)) +distinct_authors.insert(0, primary_author) commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name, '--pretty=format:%h [%an] %s']).split("\n\n") @@ -154,13 +159,10 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc): # to people every time someone creates a public fork of Spark. merge_message_flags += ["-m", body.replace("@", "")] -authors = "\n".join(["Author: %s" % a for a in distinct_authors]) - -merge_message_flags += ["-m", authors] +committer_name = run_cmd("git config --get user.name").strip() +committer_email = run_cmd("git config --get user.email").strip() if had_conflicts: -committer_name = run_cmd("git config --get user.name").strip() -committer_email = run_cmd("git config --get user.email").strip() message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % ( committer_name, committer_email) merge_message_flags += ["-m", message] @@ -168,6 +170,14 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc): # The string "Closes #%s" string is required for GitHub to correctly close the PR merge_message_flags += ["-m", "Closes #%s from %s." % (pr_num, pr_repo_desc)] +authors = "Authored-by:" if len(distinct_authors) == 1 else "Lead-authored-by:" +authors += " %s" % (distinct_authors.pop(0)) +if len(distinct_authors) > 0: +authors += "\n" + "\n".join(["Co-authored-by: %s" % a for a in distinct_authors]) +authors += "\n" + "Signed-off-by: %s <%s>" % (committer_name, committer_email) + +merge_message_flags += ["-m", authors] + run_cmd(['git', 'commit', '--author="%s"' % primary_author] + merge_message_flags) continue_maybe("Merge complete (local ref %s). Push to %s?" % (
spark git commit: [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress
Repository: spark Updated Branches: refs/heads/master 6afe6f32c -> 18b6ec147 [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress ## What changes were proposed in this pull request? Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source). Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further. ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #21721 from arunmahadevan/SPARK-24748. Authored-by: Arun Mahadevan Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18b6ec14 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18b6ec14 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18b6ec14 Branch: refs/heads/master Commit: 18b6ec14716bfafc25ae281b190547ea58b59af1 Parents: 6afe6f3 Author: Arun Mahadevan Authored: Tue Aug 7 10:28:26 2018 +0800 Committer: hyukjinkwon Committed: Tue Aug 7 10:28:26 2018 +0800 -- .../spark/sql/sources/v2/CustomMetrics.java | 33 ++ .../streaming/SupportsCustomReaderMetrics.java | 47 +++ .../streaming/SupportsCustomWriterMetrics.java | 47 +++ .../execution/streaming/ProgressReporter.scala | 63 ++-- .../streaming/sources/MicroBatchWriter.scala| 2 +- .../execution/streaming/sources/memoryV2.scala | 32 -- .../apache/spark/sql/streaming/progress.scala | 46 -- .../execution/streaming/MemorySinkV2Suite.scala | 22 +++ .../sql/streaming/StreamingQuerySuite.scala | 28 + 9 files changed, 306 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/18b6ec14/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java new file mode 100644 index 000..7011a70 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface for reporting custom metrics from streaming sources and sinks + */ +@InterfaceStability.Evolving +public interface CustomMetrics { + /** + * Returns a JSON serialized representation of custom metrics + * + * @return JSON serialized representation of custom metrics + */ + String json(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/18b6ec14/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java new file mode 100644 index 000..3b293d9 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain
spark git commit: [SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics
Repository: spark Updated Branches: refs/heads/master 1076e4f00 -> 6afe6f32c [SPARK-24637][SS] Add metrics regarding state and watermark to dropwizard metrics ## What changes were proposed in this pull request? The patch adds metrics regarding state and watermark to dropwizard metrics, so that watermark and state rows/size can be tracked via time-series manner. ## How was this patch tested? Manually tested with CSV metric sink. Closes #21622 from HeartSaVioR/SPARK-24637. Authored-by: Jungtaek Lim Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6afe6f32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6afe6f32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6afe6f32 Branch: refs/heads/master Commit: 6afe6f32ca2880b13bb5fb4397b2058eef12952b Parents: 1076e4f Author: Jungtaek Lim Authored: Tue Aug 7 10:12:22 2018 +0800 Committer: hyukjinkwon Committed: Tue Aug 7 10:12:22 2018 +0800 -- .../execution/streaming/MetricsReporter.scala | 20 .../sql/streaming/StreamingQuerySuite.scala | 3 +++ 2 files changed, 23 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 66b11ec..8709822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.streaming +import java.text.SimpleDateFormat + import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.{Source => CodahaleSource} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.streaming.StreamingQueryProgress /** @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + + private def convertStringDateToMillis(isoUtcDateStr: String) = { +if (isoUtcDateStr != null) { + timestampFormat.parse(isoUtcDateStr).getTime +} else { + 0L +} + } + private def registerGauge[T]( name: String, f: StreamingQueryProgress => T, http://git-wip-us.apache.org/repos/asf/spark/blob/6afe6f32/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index f37f368..9cceec9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -467,6 +467,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(gauges.get("latency").getValue.asInstanceOf[Long] == 0) assert(gauges.get("processingRate-total").getValue.asInstanceOf[Double] == 0.0) assert(gauges.get("inputRate-total").getValue.asInstanceOf[Double] == 0.0) +assert(gauges.get("eventTime-watermark").getValue.asInstanceOf[Long] == 0) +assert(gauges.get("states-rowsTotal").getValue.asInstanceOf[Long] == 0) +assert(gauges.get("states-usedBytes").getValue.asInstanceOf[Long] == 0) sq.stop() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOCS] Fix grammatical error in SortShuffleManager
Repository: spark Updated Branches: refs/heads/master 0f3fa2f28 -> 1076e4f00 [MINOR][DOCS] Fix grammatical error in SortShuffleManager ## What changes were proposed in this pull request? Fix a grammatical error in the comment of SortShuffleManager. ## How was this patch tested? N/A Closes #21956 from deshanxiao/master. Authored-by: deshanxiao <42019462+deshanx...@users.noreply.github.com> Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1076e4f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1076e4f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1076e4f0 Branch: refs/heads/master Commit: 1076e4f0026914804b5948ff0da0c84def1315cc Parents: 0f3fa2f Author: deshanxiao <42019462+deshanx...@users.noreply.github.com> Authored: Tue Aug 7 09:36:37 2018 +0800 Committer: hyukjinkwon Committed: Tue Aug 7 09:36:37 2018 +0800 -- .../scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1076e4f0/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index d9fad64..0caf84c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -27,7 +27,7 @@ import org.apache.spark.shuffle._ * In sort-based shuffle, incoming records are sorted according to their target partition ids, then * written to a single map output file. Reducers fetch contiguous regions of this file in order to * read their portion of the map output. In cases where the map output data is too large to fit in - * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged + * memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged * to produce the final output file. * * Sort-based shuffle has two different write paths for producing its map output files: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Repository: spark Updated Branches: refs/heads/master 408a3ff2c -> 0f3fa2f28 [SPARK-24996][SQL] Use DSL in DeclarativeAggregate ## What changes were proposed in this pull request? The PR refactors the aggregate expressions which were not using DSL in order to simplify them. ## How was this patch tested? NA Author: Marco Gaido Closes #21970 from mgaido91/SPARK-24996. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f3fa2f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f3fa2f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f3fa2f2 Branch: refs/heads/master Commit: 0f3fa2f289f53a8ceea3b0a52fa6dc319001b10b Parents: 408a3ff Author: Marco Gaido Authored: Mon Aug 6 19:46:51 2018 -0400 Committer: Xiao Li Committed: Mon Aug 6 19:46:51 2018 -0400 -- .../apache/spark/sql/catalyst/dsl/package.scala | 2 + .../expressions/aggregate/Average.scala | 2 +- .../aggregate/CentralMomentAgg.scala| 40 +--- .../catalyst/expressions/aggregate/Corr.scala | 13 +++ .../expressions/aggregate/Covariance.scala | 16 .../catalyst/expressions/aggregate/First.scala | 7 ++-- .../catalyst/expressions/aggregate/Last.scala | 7 ++-- .../catalyst/expressions/aggregate/Max.scala| 5 ++- .../catalyst/expressions/aggregate/Min.scala| 5 ++- .../catalyst/expressions/aggregate/Sum.scala| 7 ++-- .../expressions/windowExpressions.scala | 30 +++ 11 files changed, 65 insertions(+), 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f3fa2f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 75387fa..2b582b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -167,6 +167,8 @@ package object dsl { def upper(e: Expression): Expression = Upper(e) def lower(e: Expression): Expression = Lower(e) def coalesce(args: Expression*): Expression = Coalesce(args) +def greatest(args: Expression*): Expression = Greatest(args) +def least(args: Expression*): Expression = Least(args) def sqrt(e: Expression): Expression = Sqrt(e) def abs(e: Expression): Expression = Abs(e) def star(names: String*): Expression = names match { http://git-wip-us.apache.org/repos/asf/spark/blob/0f3fa2f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala index f1fad77..5ecb77b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala @@ -68,7 +68,7 @@ abstract class AverageLike(child: Expression) extends DeclarativeAggregate { Add( sum, coalesce(child.cast(sumDataType), Literal(0).cast(sumDataType))), -/* count = */ If(IsNull(child), count, count + 1L) +/* count = */ If(child.isNull, count, count + 1L) ) override lazy val updateExpressions = updateExpressionsDef http://git-wip-us.apache.org/repos/asf/spark/blob/0f3fa2f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala index 6bbb083..e2ff0ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala @@ -75,7 +75,7 @@ abstract class CentralMomentAgg(child: Expression) val n2 = n.right val newN = n1 + n2 val delta = avg.right - avg.left -val deltaN = If(newN === Literal(0.0), Literal(0.0), delta / newN) +val deltaN = If(newN === 0.0, 0.0, delta / newN) val newAvg = avg.left + deltaN * n2 // higher order moments computed according to: @@ -102,7 +102,7 @@ abstract class CentralMomentAgg(child: Expression) } prot
spark git commit: [SPARK-25036][SQL] Should compare ExprValue.isNull with LiteralTrue/LiteralFalse
Repository: spark Updated Branches: refs/heads/master 87ca7396c -> 408a3ff2c [SPARK-25036][SQL] Should compare ExprValue.isNull with LiteralTrue/LiteralFalse ## What changes were proposed in this pull request? This PR fixes a comparison of `ExprValue.isNull` with `String`. `ExprValue.isNull` should be compared with `LiteralTrue` or `LiteralFalse`. This causes the following compilation error using scala-2.12 with sbt. In addition, this code may also generate incorrect code in Spark 2.3. ``` /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:94: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely always compare unequal [error] [warn] if (eval.isNull != "true") { [error] [warn] [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:126: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal [error] [warn] if (eval.isNull == "true") { [error] [warn] [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:133: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal [error] [warn] if (eval.isNull == "true") { [error] [warn] [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala:90: org.apache.spark.sql.catalyst.expressions.codegen.ExprValue and String are unrelated: they will most likely never compare equal [error] [warn] if (inputs.map(_.isNull).forall(_ == "false")) { [error] [warn] ``` ## How was this patch tested? Existing UTs Author: Kazuaki Ishizaki Closes #22012 from kiszk/SPARK-25036a. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/408a3ff2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/408a3ff2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/408a3ff2 Branch: refs/heads/master Commit: 408a3ff2c484fba5734c03dbc570b654dcbc1f23 Parents: 87ca739 Author: Kazuaki Ishizaki Authored: Mon Aug 6 19:43:21 2018 -0400 Committer: Xiao Li Committed: Mon Aug 6 19:43:21 2018 -0400 -- .../expressions/codegen/GenerateUnsafeProjection.scala | 2 +- .../spark/sql/catalyst/expressions/stringExpressions.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/408a3ff2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 8f2a5a0..998a675 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -87,7 +87,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro // For top level row writer, it always writes to the beginning of the global buffer holder, // which means its fixed-size region always in the same position, so we don't need to call // `reset` to set up its fixed-size region every time. - if (inputs.map(_.isNull).forall(_ == "false")) { + if (inputs.map(_.isNull).forall(_ == FalseLiteral)) { // If all fields are not nullable, which means the null bits never changes, then we don't // need to clear it out every time. "" http://git-wip-us.apache.org/repos/asf/spark/blob/408a3ff2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 1838b9f..e1549d3 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -91,7 +91,7 @@ case class ConcatWs(children: Seq[Expression]) val args = ctx.fres
svn commit: r28584 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_16_02-87ca739-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Aug 6 23:16:08 2018 New Revision: 28584 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_06_16_02-87ca739 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24161][SS] Enable debug package feature on structured streaming
Repository: spark Updated Branches: refs/heads/master 3c96937c7 -> 87ca7396c [SPARK-24161][SS] Enable debug package feature on structured streaming ## What changes were proposed in this pull request? Currently, debug package has a implicit class "DebugQuery" which matches Dataset to provide debug features on Dataset class. It doesn't work with structured streaming: it requires query is already started, and the information can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" had to be placed to StreamingQuery whereas it already exists on Dataset. This patch adds a new implicit class "DebugStreamQuery" which matches StreamingQuery to provide similar debug features on StreamingQuery class. ## How was this patch tested? Added relevant unit tests. Author: Jungtaek Lim Closes #21222 from HeartSaVioR/SPARK-24161. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87ca7396 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87ca7396 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87ca7396 Branch: refs/heads/master Commit: 87ca7396c7b21a87874d8ceb32e53119c609002c Parents: 3c96937 Author: Jungtaek Lim Authored: Mon Aug 6 15:23:47 2018 -0700 Committer: Shixiong Zhu Committed: Mon Aug 6 15:23:47 2018 -0700 -- .../spark/sql/execution/debug/package.scala | 59 +- .../spark/sql/streaming/StreamSuite.scala | 116 +++ 2 files changed, 173 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/87ca7396/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index a717cbd..366e1fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -29,6 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef +import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec +import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** @@ -40,6 +43,16 @@ import org.apache.spark.util.{AccumulatorV2, LongAccumulator} * sql("SELECT 1").debug() * sql("SELECT 1").debugCodegen() * }}} + * + * or for streaming case (structured streaming): + * {{{ + * import org.apache.spark.sql.execution.debug._ + * val query = df.writeStream.<...>.start() + * query.debugCodegen() + * }}} + * + * Note that debug in structured streaming is not supported, because it doesn't make sense for + * streaming to execute batch once while main query is running concurrently. */ package object debug { @@ -89,13 +102,49 @@ package object debug { } /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan into one String + * + * @param query the streaming query for codegen + * @return single String containing all WholeStageCodegen subtrees and corresponding codegen + */ + def codegenString(query: StreamingQuery): String = { +val w = asStreamExecution(query) +if (w.lastExecution != null) { + codegenString(w.lastExecution.executedPlan) +} else { + "No physical plan. Waiting for data." +} + } + + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param query the streaming query for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { +val w = asStreamExecution(query) +if (w.lastExecution != null) { + codegenStringSeq(w.lastExecution.executedPlan) +} else { + Seq.empty +} + } + + private def asStreamExecution(query: StreamingQuery): StreamExecution = query match { +case wrapper: StreamingQueryWrapper => wrapper.streamingQuery +case q: StreamExecution => q +case _ => throw new IllegalArgumentException("Parameter should be an instance of " + + "StreamExecution!") + } + + /** * Augments [[Dataset]]s with debug methods. */ implicit class DebugQuery(query: Dataset[_]) extends Logging { def debug(): Unit = { - val plan = query.queryExecution.executedPlan val visited = new col
spark git commit: [SPARK-24948][SHS] Delegate check access permissions to the file system
Repository: spark Updated Branches: refs/heads/master 278984d5a -> 3c96937c7 [SPARK-24948][SHS] Delegate check access permissions to the file system ## What changes were proposed in this pull request? In `SparkHadoopUtil. checkAccessPermission`, we consider only basic permissions in order to check wether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can). The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons. ## How was this patch tested? modified UTs Author: Marco Gaido Closes #21895 from mgaido91/SPARK-24948. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c96937c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c96937c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c96937c Branch: refs/heads/master Commit: 3c96937c7b1d7a010b630f4b98fd22dafc37808b Parents: 278984d Author: Marco Gaido Authored: Mon Aug 6 14:29:05 2018 -0700 Committer: Mridul Muralidharan Committed: Mon Aug 6 14:29:05 2018 -0700 -- .../apache/spark/deploy/SparkHadoopUtil.scala | 23 - .../deploy/history/FsHistoryProvider.scala | 67 ++ .../spark/deploy/SparkHadoopUtilSuite.scala | 97 .../deploy/history/FsHistoryProviderSuite.scala | 42 - 4 files changed, 89 insertions(+), 140 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8353e64..70a8c65 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -31,7 +31,6 @@ import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} -import org.apache.hadoop.fs.permission.FsAction import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} @@ -367,28 +366,6 @@ class SparkHadoopUtil extends Logging { buffer.toString } - private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = { -val perm = status.getPermission -val ugi = UserGroupInformation.getCurrentUser - -if (ugi.getShortUserName == status.getOwner) { - if (perm.getUserAction.implies(mode)) { -return true - } -} else if (ugi.getGroupNames.contains(status.getGroup)) { - if (perm.getGroupAction.implies(mode)) { -return true - } -} else if (perm.getOtherAction.implies(mode)) { - return true -} - -logDebug(s"Permission denied: user=${ugi.getShortUserName}, " + - s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" + - s"${if (status.isDirectory) "d" else "-"}$perm") -false - } - def serialize(creds: Credentials): Array[Byte] = { val byteStream = new ByteArrayOutputStream val dataStream = new DataOutputStream(byteStream) http://git-wip-us.apache.org/repos/asf/spark/blob/3c96937c/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bf1eeb0..44d2390 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -21,11 +21,12 @@ import java.io.{File, FileNotFoundException, IOException} import java.nio.file.Files import java.nio.file.attribute.PosixFilePermissions import java.util.{Date, ServiceLoader} -import java.util.concurrent.{ExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.concurrent.ExecutionException import scala.io.Source import scala.util.Try import scala.xml.Node @@ -33,8 +34,7 @@ import scala.xml.Node import com.fasterxml.jackson.annotation.JsonI
svn commit: r28578 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_12_01-278984d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Aug 6 19:16:08 2018 New Revision: 28578 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_06_12_01-278984d docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25019][BUILD] Fix orc dependency to use the same exclusion rules
Repository: spark Updated Branches: refs/heads/master 51e2b38d9 -> 278984d5a [SPARK-25019][BUILD] Fix orc dependency to use the same exclusion rules ## What changes were proposed in this pull request? During upgrading Apache ORC to 1.5.2 ([SPARK-24576](https://issues.apache.org/jira/browse/SPARK-24576)), `sql/core` module overrides the exclusion rules of parent pom file and it causes published `spark-sql_2.1X` artifacts have incomplete exclusion rules ([SPARK-25019](https://issues.apache.org/jira/browse/SPARK-25019)). This PR fixes it by moving the newly added exclusion rule to the parent pom. This also fixes the sbt build hack introduced at that time. ## How was this patch tested? Pass the existing dependency check and the tests. Author: Dongjoon Hyun Closes #22003 from dongjoon-hyun/SPARK-25019. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/278984d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/278984d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/278984d5 Branch: refs/heads/master Commit: 278984d5a5e56136c9f940f2d0e3d2040fad180b Parents: 51e2b38 Author: Dongjoon Hyun Authored: Mon Aug 6 12:00:39 2018 -0700 Committer: Yin Huai Committed: Mon Aug 6 12:00:39 2018 -0700 -- pom.xml | 4 sql/core/pom.xml | 28 2 files changed, 4 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/278984d5/pom.xml -- diff --git a/pom.xml b/pom.xml index c46eb31..8abdb70 100644 --- a/pom.xml +++ b/pom.xml @@ -1744,6 +1744,10 @@ hadoop-common +org.apache.hadoop +hadoop-hdfs + + org.apache.hive hive-storage-api http://git-wip-us.apache.org/repos/asf/spark/blob/278984d5/sql/core/pom.xml -- diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 68b42a4..ba17f5f 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -90,39 +90,11 @@ org.apache.orc orc-core ${orc.classifier} - - - org.apache.hadoop - hadoop-hdfs - - - - org.apache.hive - hive-storage-api - - org.apache.orc orc-mapreduce ${orc.classifier} - - - org.apache.hadoop - hadoop-hdfs - - - - org.apache.hive - hive-storage-api - - org.apache.parquet - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24992][CORE] spark should randomize yarn local dir selection
Repository: spark Updated Branches: refs/heads/master 1a5e46076 -> 51e2b38d9 [SPARK-24992][CORE] spark should randomize yarn local dir selection **Description: [SPARK-24992](https://issues.apache.org/jira/browse/SPARK-24992)** Utils.getLocalDir is used to get path of a temporary directory. However, it always returns the the same directory, which is the first element in the array localRootDirs. When running on YARN, this might causes the case that we always write to one disk, which makes it busy while other disks are free. We should randomize the selection to spread out the loads. **What changes were proposed in this pull request?** This PR randomized the selection of local directory inside the method Utils.getLocalDir. This change affects the Utils.fetchFile method since it based on the fact that Utils.getLocalDir always return the same directory to cache file. Therefore, a new variable cachedLocalDir is used to cache the first localDirectory that it gets from Utils.getLocalDir. Also, when getting the configured local directories (inside Utils. getConfiguredLocalDirs), in case we are in yarn mode, the array of directories are also randomized before return. Author: Hieu Huynh <âhieu.hu...@oath.comâ> Closes #21953 from hthuynh2/SPARK_24992. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51e2b38d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51e2b38d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51e2b38d Branch: refs/heads/master Commit: 51e2b38d93df8cb0cc151d5e68a2190eab52644c Parents: 1a5e460 Author: Hieu Huynh <âhieu.hu...@oath.comâ> Authored: Mon Aug 6 13:58:28 2018 -0500 Committer: Thomas Graves Committed: Mon Aug 6 13:58:28 2018 -0500 -- .../scala/org/apache/spark/util/Utils.scala | 21 1 file changed, 17 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51e2b38d/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a6fd363..7ec707d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -83,6 +83,7 @@ private[spark] object Utils extends Logging { val random = new Random() private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler + @volatile private var cachedLocalDir: String = "" /** * Define a default value for driver memory here since this value is referenced across the code @@ -462,7 +463,15 @@ private[spark] object Utils extends Logging { if (useCache && fetchCacheEnabled) { val cachedFileName = s"${url.hashCode}${timestamp}_cache" val lockFileName = s"${url.hashCode}${timestamp}_lock" - val localDir = new File(getLocalDir(conf)) + // Set the cachedLocalDir for the first time and re-use it later + if (cachedLocalDir.isEmpty) { +this.synchronized { + if (cachedLocalDir.isEmpty) { +cachedLocalDir = getLocalDir(conf) + } +} + } + val localDir = new File(cachedLocalDir) val lockFile = new File(localDir, lockFileName) val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel() // Only one executor entry. @@ -767,13 +776,17 @@ private[spark] object Utils extends Logging { * - Otherwise, this will return java.io.tmpdir. * * Some of these configuration options might be lists of multiple paths, but this method will - * always return a single directory. + * always return a single directory. The return directory is chosen randomly from the array + * of directories it gets from getOrCreateLocalRootDirs. */ def getLocalDir(conf: SparkConf): String = { -getOrCreateLocalRootDirs(conf).headOption.getOrElse { +val localRootDirs = getOrCreateLocalRootDirs(conf) +if (localRootDirs.isEmpty) { val configuredLocalDirs = getConfiguredLocalDirs(conf) throw new IOException( s"Failed to get a temp directory under [${configuredLocalDirs.mkString(",")}].") +} else { + localRootDirs(scala.util.Random.nextInt(localRootDirs.length)) } } @@ -815,7 +828,7 @@ private[spark] object Utils extends Logging { // to what Yarn on this system said was available. Note this assumes that Yarn has // created the directories already, and that they are secured so that only the // user has access to them. - getYarnLocalDirs(conf).split(",") + randomizeInPlace(getYarnLocalDirs(conf).split(",")) } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) { conf.getenv("SPARK_EXECUTOR
svn commit: r28575 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_08_01-1a5e460-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Aug 6 15:16:12 2018 New Revision: 28575 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_06_08_01-1a5e460 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23913][SQL] Add array_intersect function
Repository: spark Updated Branches: refs/heads/master 35700bb7f -> 1a5e46076 [SPARK-23913][SQL] Add array_intersect function ## What changes were proposed in this pull request? The PR adds the SQL function `array_intersect`. The behavior of the function is based on Presto's one. This function returns returns an array of the elements in the intersection of array1 and array2. Note: The order of elements in the result is not defined. ## How was this patch tested? Added UTs Author: Kazuaki Ishizaki Closes #21102 from kiszk/SPARK-23913. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a5e4607 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a5e4607 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a5e4607 Branch: refs/heads/master Commit: 1a5e460762593c61b7ff2c5f3641d406706616ff Parents: 35700bb Author: Kazuaki Ishizaki Authored: Mon Aug 6 23:27:57 2018 +0900 Committer: Takuya UESHIN Committed: Mon Aug 6 23:27:57 2018 +0900 -- python/pyspark/sql/functions.py | 19 + .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/collectionOperations.scala | 386 +++ .../CollectionExpressionsSuite.scala| 112 ++ .../scala/org/apache/spark/sql/functions.scala | 11 + .../spark/sql/DataFrameFunctionsSuite.scala | 54 +++ 6 files changed, 515 insertions(+), 68 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a5e4607/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index ec014a5..eaecf28 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2035,6 +2035,25 @@ def array_distinct(col): @ignore_unicode_prefix @since(2.4) +def array_intersect(col1, col2): +""" +Collection function: returns an array of the elements in the intersection of col1 and col2, +without duplicates. + +:param col1: name of column containing array +:param col2: name of column containing array + +>>> from pyspark.sql import Row +>>> df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"])]) +>>> df.select(array_intersect(df.c1, df.c2)).collect() +[Row(array_intersect(c1, c2)=[u'a', u'c'])] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.array_intersect(_to_java_column(col1), _to_java_column(col2))) + + +@ignore_unicode_prefix +@since(2.4) def array_union(col1, col2): """ Collection function: returns an array of the elements in the union of col1 and col2, http://git-wip-us.apache.org/repos/asf/spark/blob/1a5e4607/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 35f8de1..ed2f67d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -411,6 +411,7 @@ object FunctionRegistry { expression[CreateArray]("array"), expression[ArrayContains]("array_contains"), expression[ArraysOverlap]("arrays_overlap"), +expression[ArrayIntersect]("array_intersect"), expression[ArrayJoin]("array_join"), expression[ArrayPosition]("array_position"), expression[ArraySort]("array_sort"), http://git-wip-us.apache.org/repos/asf/spark/blob/1a5e4607/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 3f94f25..e385c2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -3651,7 +3651,7 @@ case class ArrayDistinct(child: Expression) } /** - * Will become common base class for [[ArrayUnion]], ArrayIntersect, and [[ArrayExcept]]. + * Will become common base class for [[ArrayUnion]], [[ArrayIntersect]], and [[ArrayExcept]]. */ abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { override def checkInputDataTypes(): TypeCheckResult = { @@ -3672,6 +3672,75 @@ abstract c
spark git commit: [SPARK-24981][CORE] ShutdownHook timeout causes job to fail when succeeded when SparkContext stop() not called by user program
Repository: spark Updated Branches: refs/heads/master c1760da5d -> 35700bb7f [SPARK-24981][CORE] ShutdownHook timeout causes job to fail when succeeded when SparkContext stop() not called by user program **Description** The issue is described in [SPARK-24981](https://issues.apache.org/jira/browse/SPARK-24981). **How does this PR fix the issue?** This PR catch the Exception that is thrown while the Sparkcontext.stop() is running (when it is called by the ShutdownHookManager). **How was this patch tested?** I manually tested it by adding delay (60s) inside the stop(). This make the shutdownHookManger interrupt the thread that is running stop(). The Interrupted Exception was catched and the job succeed. Author: Hieu Huynh <âhieu.hu...@oath.comâ> Author: Hieu Tri Huynh Closes #21936 from hthuynh2/SPARK_24981. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35700bb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35700bb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35700bb7 Branch: refs/heads/master Commit: 35700bb7f2e3008ff781a1b3a1da8147d26371be Parents: c1760da Author: Hieu Huynh <âhieu.hu...@oath.comâ> Authored: Mon Aug 6 09:01:51 2018 -0500 Committer: Thomas Graves Committed: Mon Aug 6 09:01:51 2018 -0500 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35700bb7/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03e91cd..e8bacee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -571,7 +571,12 @@ class SparkContext(config: SparkConf) extends Logging { _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") - stop() + try { +stop() + } catch { +case e: Throwable => + logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) + } } } catch { case NonFatal(e) => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28572 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_06_06_39-c1760da-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Aug 6 13:59:00 2018 New Revision: 28572 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_06_06_39-c1760da docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25025][SQL] Remove the default value of isAll in INTERSECT/EXCEPT
Repository: spark Updated Branches: refs/heads/master d063e3a47 -> c1760da5d [SPARK-25025][SQL] Remove the default value of isAll in INTERSECT/EXCEPT ## What changes were proposed in this pull request? Having the default value of isAll in the logical plan nodes INTERSECT/EXCEPT could introduce bugs when the callers are not aware of it. This PR removes the default value and makes caller explicitly specify them. ## How was this patch tested? This is a refactoring change. Existing tests test the functionality already. Author: Dilip Biswal Closes #22000 from dilipbiswal/SPARK-25025. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1760da5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1760da5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1760da5 Branch: refs/heads/master Commit: c1760da5dd5576c52be4f9dd9ecd06589a6153e4 Parents: d063e3a Author: Dilip Biswal Authored: Mon Aug 6 06:56:36 2018 -0400 Committer: Xiao Li Committed: Mon Aug 6 06:56:36 2018 -0400 -- .../apache/spark/sql/catalyst/dsl/package.scala | 4 ++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 6 ++--- .../plans/logical/basicLogicalOperators.scala | 4 ++-- .../catalyst/analysis/AnalysisErrorSuite.scala | 12 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 6 ++--- .../catalyst/analysis/TypeCoercionSuite.scala | 24 +--- .../analysis/UnsupportedOperationsSuite.scala | 4 ++-- .../catalyst/optimizer/ColumnPruningSuite.scala | 4 ++-- .../optimizer/ReplaceOperatorSuite.scala| 16 ++--- .../sql/catalyst/parser/PlanParserSuite.scala | 21 + .../plans/ConstraintPropagationSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/Dataset.scala| 4 ++-- 12 files changed, 60 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1760da5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 7997e79..75387fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -356,10 +356,10 @@ package object dsl { def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) - def except(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan = + def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan = Except(logicalPlan, otherPlan, isAll) - def intersect(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan = + def intersect(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan = Intersect(logicalPlan, otherPlan, isAll) def union(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, otherPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/c1760da5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9906a30..732d762 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -534,15 +534,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.INTERSECT if all => Intersect(left, right, isAll = true) case SqlBaseParser.INTERSECT => -Intersect(left, right) +Intersect(left, right, isAll = false) case SqlBaseParser.EXCEPT if all => Except(left, right, isAll = true) case SqlBaseParser.EXCEPT => -Except(left, right) +Except(left, right, isAll = false) case SqlBaseParser.SETMINUS if all => Except(left, right, isAll = true) case SqlBaseParser.SETMINUS => -Except(left, right) +Except(left, right, isAll = false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c1760da5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOpera
spark git commit: [SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesceHints
Repository: spark Updated Branches: refs/heads/master 64ad7b841 -> d063e3a47 [SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesceHints ## What changes were proposed in this pull request? Follow up to fix an unmerged review comment. ## How was this patch tested? Unit test ResolveHintsSuite. Author: John Zhuge Closes #21998 from jzhuge/SPARK-24940. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d063e3a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d063e3a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d063e3a4 Branch: refs/heads/master Commit: d063e3a478221c836a0aa74a69828a526a6207bb Parents: 64ad7b8 Author: John Zhuge Authored: Mon Aug 6 06:41:55 2018 -0400 Committer: Xiao Li Committed: Mon Aug 6 06:41:55 2018 -0400 -- .../org/apache/spark/sql/catalyst/analysis/ResolveHints.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d063e3a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 1ef482b..80d5105 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -20,12 +20,11 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.IntegerLiteral import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.IntegerType /** @@ -119,7 +118,7 @@ object ResolveHints { case "COALESCE" => false } val numPartitions = h.parameters match { - case Seq(Literal(numPartitions: Int, IntegerType)) => + case Seq(IntegerLiteral(numPartitions)) => numPartitions case Seq(numPartitions: Int) => numPartitions - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23772][FOLLOW-UP][SQL] Provide an option to ignore column of all null values or empty array during JSON schema inference
Repository: spark Updated Branches: refs/heads/master ac527b520 -> 64ad7b841 [SPARK-23772][FOLLOW-UP][SQL] Provide an option to ignore column of all null values or empty array during JSON schema inference ## What changes were proposed in this pull request? The `dropFieldIfAllNull` parameter of the `json` method wasn't set as an option. This PR fixes that. ## How was this patch tested? I added a test to `sql/test.py` Author: Maxim Gekk Closes #22002 from MaxGekk/drop-field-if-all-null. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64ad7b84 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64ad7b84 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64ad7b84 Branch: refs/heads/master Commit: 64ad7b841d1efa979041358ee2a19aea7382d737 Parents: ac527b52 Author: Maxim Gekk Authored: Mon Aug 6 16:46:55 2018 +0800 Committer: hyukjinkwon Committed: Mon Aug 6 16:46:55 2018 +0800 -- python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/tests.py | 16 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64ad7b84/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 98b2cd9..abf878a 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -267,7 +267,7 @@ class DataFrameReader(OptionUtils): mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, timestampFormat=timestampFormat, multiLine=multiLine, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, -samplingRatio=samplingRatio, encoding=encoding) +samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding) if isinstance(path, basestring): path = [path] if type(path) == list: http://git-wip-us.apache.org/repos/asf/spark/blob/64ad7b84/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a294d70..ed97a63 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3351,6 +3351,22 @@ class SQLTests(ReusedSQLTestCase): finally: shutil.rmtree(path) +def test_ignore_column_of_all_nulls(self): +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: +df = self.spark.createDataFrame([["""{"a":null, "b":1, "c":3.0}"""], + ["""{"a":null, "b":null, "c":"string"}"""], + ["""{"a":null, "b":null, "c":null}"""]]) +df.write.text(path) +schema = StructType([ +StructField('b', LongType(), nullable=True), +StructField('c', StringType(), nullable=True)]) +readback = self.spark.read.json(path, dropFieldIfAllNull=True) +self.assertEquals(readback.schema, schema) +finally: +shutil.rmtree(path) + def test_repr_behaviors(self): import re pattern = re.compile(r'^ *\|', re.MULTILINE) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24991][SQL] use InternalRow in DataSourceWriter
Repository: spark Updated Branches: refs/heads/master 327bb3007 -> ac527b520 [SPARK-24991][SQL] use InternalRow in DataSourceWriter ## What changes were proposed in this pull request? A follow up of #21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #21948 from cloud-fan/row-write. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac527b52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac527b52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac527b52 Branch: refs/heads/master Commit: ac527b5205ec2826677e2b7ad0d424aa976bce81 Parents: 327bb30 Author: Wenchen Fan Authored: Mon Aug 6 15:52:01 2018 +0800 Committer: Wenchen Fan Committed: Mon Aug 6 15:52:01 2018 +0800 -- .../spark/sql/kafka010/KafkaStreamWriter.scala | 4 +- .../sql/sources/v2/writer/DataSourceWriter.java | 4 +- .../spark/sql/sources/v2/writer/DataWriter.java | 4 +- .../sources/v2/writer/DataWriterFactory.java| 5 +- .../v2/writer/SupportsWriteInternalRow.java | 41 --- .../datasources/v2/WriteToDataSourceV2.scala| 30 +--- .../streaming/MicroBatchExecution.scala | 10 +-- .../continuous/ContinuousWriteRDD.scala | 6 +- .../WriteToContinuousDataSourceExec.scala | 12 +--- .../streaming/sources/ConsoleWriter.scala | 11 ++- .../sources/ForeachWriterProvider.scala | 10 +-- .../streaming/sources/MicroBatchWriter.scala| 21 +- .../sources/PackedRowWriterFactory.scala| 15 ++-- .../execution/streaming/sources/memoryV2.scala | 33 + .../execution/streaming/MemorySinkV2Suite.scala | 18 +++-- .../sql/sources/v2/DataSourceV2Suite.scala | 7 -- .../sources/v2/SimpleWritableDataSource.scala | 72 ++-- 17 files changed, 73 insertions(+), 230 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac527b52/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala index 32923dc..5f0802b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -42,11 +42,11 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage */ class KafkaStreamWriter( topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends StreamWriter with SupportsWriteInternalRow { + extends StreamWriter { validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) - override def createInternalRowWriterFactory(): KafkaStreamWriterFactory = + override def createWriterFactory(): KafkaStreamWriterFactory = KafkaStreamWriterFactory(topic, producerParams, schema) override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} http://git-wip-us.apache.org/repos/asf/spark/blob/ac527b52/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java index 7eedc85..385fc29 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java @@ -18,8 +18,8 @@ package org.apache.spark.sql.sources.v2.writer; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.StreamWriteSupport; import org.apache.spark.sql.sources.v2.WriteSupport; @@ -61,7 +61,7 @@ public interface DataSourceWriter { * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. */ - DataWriterFactory createWriterFactory(); + DataWriterFactory createWriterFactory(); /** * Returns whether Spark should use the commit coordinator to ensure that at most one task for http://git-wip-us.apache.org/repos/asf/spark/blob/ac527b52/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/