[spark] branch master updated: [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on completion of the iteration
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aea5f50 [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on completion of the iteration aea5f50 is described below commit aea5f506463c19fac97547ba7a28f9dd491e3a6a Author: Liupengcheng AuthorDate: Fri Feb 1 13:47:14 2019 +0800 [SPARK-26525][SHUFFLE] Fast release ShuffleBlockFetcherIterator on completion of the iteration ## What changes were proposed in this pull request? Currently, spark would not release ShuffleBlockFetcherIterator until the whole task finished.In some conditions, it incurs memory leak. An example is `rdd.repartition(m).coalesce(n, shuffle = false).save`, each `ShuffleBlockFetcherIterator` contains some metas about mapStatus(`blocksByAddress`) and each resultTask will keep n(max to shuffle partitions) shuffleBlockFetcherIterator and the memory would never released until the task completion, for they are referenced by the completion callbacks of TaskContext. In some case, it may take huge memory and incurs OOM. Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed. This PR is to resolve this problem. ## How was this patch tested? unittest Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23438 from liupc/Fast-release-shuffleblockfetcheriterator. Lead-authored-by: Liupengcheng Co-authored-by: liupengcheng Signed-off-by: Wenchen Fan --- .../spark/shuffle/BlockStoreShuffleReader.scala| 2 +- .../storage/ShuffleBlockFetcherIterator.scala | 34 -- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index daafe30..c5eefc7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM), SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT), - readMetrics) + readMetrics).toCompletionIterator val serializerInstance = dep.serializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index f73c21b..3966980 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -31,7 +31,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.util.TransportConf import org.apache.spark.shuffle.{FetchFailedException, ShuffleReadMetricsReporter} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils} import org.apache.spark.util.io.ChunkedByteBufferOutputStream /** @@ -160,6 +160,8 @@ final class ShuffleBlockFetcherIterator( @GuardedBy("this") private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]() + private[this] val onCompleteCallback = new ShuffleFetchCompletionListener(this) + initialize() // Decrements the buffer reference count. @@ -192,7 +194,7 @@ final class ShuffleBlockFetcherIterator( /** * Mark the iterator as zombie, and release all buffers that haven't been deserialized yet. */ - private[this] def cleanup() { + private[storage] def cleanup() { synchronized { isZombie = true } @@ -364,7 +366,7 @@ final class ShuffleBlockFetcherIterator( private[this] def initialize(): Unit = { // Add a task completion callback (called in both success case and failure case) to cleanup. -context.addTaskCompletionListener[Unit](_ => cleanup()) +context.addTaskCompletionListener(onCompleteCallback) // Split local and remote blocks. val remoteRequests = splitLocalRemoteBlocks() @@ -509,6 +511,11 @@ final class ShuffleBlockFetcherIterator( (currentResult.blockId, new BufferReleasingInputStream(input, this)) } + def toCompletionIterator: Iterator[(BlockId, InputStream)] = { +CompletionIterator[(BlockId, InputStream), this.type](this, + onCompleteCallback.onComplete(context)) + } + private def fetchUpToMaxBytes(): Unit = { // Send fetch requests up to maxBytesInFlight. If you cannot fetch from a remote
[spark] branch master updated: [SPARK-26730][SQL] Strip redundant AssertNotNull for ExpressionEncoder's serializer
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8f968b4 [SPARK-26730][SQL] Strip redundant AssertNotNull for ExpressionEncoder's serializer 8f968b4 is described below commit 8f968b4c064558d6a4c38e7e2592e49d9507 Author: wuyi AuthorDate: Fri Feb 1 10:48:37 2019 +0800 [SPARK-26730][SQL] Strip redundant AssertNotNull for ExpressionEncoder's serializer ## What changes were proposed in this pull request? For types like Product, we've already add AssertNotNull when we construct serializer(see code below), so we could strip redundant AssertNotNull for those types. ``` val fieldValue = Invoke( AssertNotNull(inputObject, walkedTypePath), fieldName, dataTypeFor(fieldType), returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) ``` ## How was this patch tested? Existed. Closes #23651 from Ngone51/dev-strip-redundant-assertnotnull-for-ecnoder-serializer. Authored-by: wuyi Signed-off-by: Wenchen Fan --- .../main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 1b06835..d5af91a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -631,8 +631,11 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } - val fieldValue = Invoke( -AssertNotNull(inputObject, walkedTypePath), fieldName, dataTypeFor(fieldType), + // SPARK-26730 inputObject won't be null with If's guard below. And KnownNotNul + // is necessary here. Because for a nullable nested inputObject with struct data + // type, e.g. StructType(IntegerType, StringType), it will return nullable=true + // for IntegerType without KnownNotNull. And that's what we do not expect to. + val fieldValue = Invoke(KnownNotNull(inputObject), fieldName, dataTypeFor(fieldType), returnNullable = !fieldType.typeSymbol.asClass.isPrimitive) val clsName = getClassNameFromType(fieldType) val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2a83431 [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by 2a83431 is described below commit 2a8343121e62aabe5c69d1e20fbb2c01e2e520e7 Author: Hyukjin Kwon AuthorDate: Fri Feb 1 10:22:05 2019 +0800 [SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization in JSON datasource by ## What changes were proposed in this pull request? This PR reverts JSON count optimization part of #21909. We cannot distinguish the cases below without parsing: ``` [{...}, {...}] ``` ``` [] ``` ``` {...} ``` ```bash # empty string ``` when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input. See also https://github.com/apache/spark/pull/23665#discussion_r251276720. ## How was this patch tested? Manually tested. Closes #23708 from HyukjinKwon/SPARK-26745-backport. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/DataFrameReader.scala | 6 ++ .../sql/execution/datasources/FailureSafeParser.scala| 11 ++- .../sql/execution/datasources/csv/UnivocityParser.scala | 16 +++- .../sql/execution/datasources/json/JsonDataSource.scala | 6 ++ .../sql/execution/datasources/json/JsonBenchmarks.scala | 4 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 869c584..e9278a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -450,8 +450,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => rawParser.parse(input, createParser, UTF8String.fromString), parsedOptions.parseMode, schema, -parsedOptions.columnNameOfCorruptRecord, -parsedOptions.multiLine) +parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming) @@ -526,8 +525,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { input => Seq(rawParser.parse(input)), parsedOptions.parseMode, schema, -parsedOptions.columnNameOfCorruptRecord, -parsedOptions.multiLine) +parsedOptions.columnNameOfCorruptRecord) iter.flatMap(parser.parse) } sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 90e8166..e618f17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -29,8 +29,7 @@ class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], mode: ParseMode, schema: StructType, -columnNameOfCorruptRecord: String, -isMultiLine: Boolean) { +columnNameOfCorruptRecord: String) { private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) @@ -58,15 +57,9 @@ class FailureSafeParser[IN]( } } - private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty - def parse(input: IN): Iterator[InternalRow] = { try { - if (skipParsing) { - Iterator.single(InternalRow.empty) - } else { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } catch { case e: BadRecordException => mode match { case PermissiveMode => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 9088d43..42e3964 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -203,11 +203,19 @@ class UnivocityParser( } }
[spark] branch master updated: [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cdd694c [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins cdd694c is described below commit cdd694c52b53165acba6faabaf3a1fbaa925ac2e Author: Hyukjin Kwon AuthorDate: Fri Feb 1 10:18:08 2019 +0800 [SPARK-7721][INFRA] Run and generate test coverage report from Python via Jenkins ## What changes were proposed in this pull request? ### Background For the current status, the test script that generates coverage information was merged into Spark, https://github.com/apache/spark/pull/20204 So, we can generate the coverage report and site by, for example: ``` run-tests-with-coverage --python-executables=python3 --modules=pyspark-sql ``` like `run-tests` script in `./python`. ### Proposed change The next step is to host this coverage report via `github.io` automatically by Jenkins (see https://spark-test.github.io/pyspark-coverage-site/). This uses my testing account for Spark, spark-test, which is shared to Felix and Shivaram a long time ago for testing purpose including AppVeyor. To cut this short, this PR targets to run the coverage in [spark-master-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/) In the specific job, it will clone the page, and rebase the up-to-date PySpark test coverage from the latest commit. For instance as below: ```bash # Clone PySpark coverage site. git clone https://github.com/spark-test/pyspark-coverage-site.git # Remove existing HTMLs. rm -fr pyspark-coverage-site/* # Copy generated coverage HTMLs. cp -r .../python/test_coverage/htmlcov/* pyspark-coverage-site/ # Check out to a temporary branch. git symbolic-ref HEAD refs/heads/latest_branch # Add all the files. git add -A # Commit current HTMLs. git commit -am "Coverage report at latest commit in Apache Spark" # Delete the old branch. git branch -D gh-pages # Rename the temporary branch to master. git branch -m gh-pages # Finally, force update to our repository. git push -f origin gh-pages ``` So, it is a one single up-to-date coverage can be shown in the `github-io` page. The commands above were manually tested. ### TODOs - [x] Write a draft HyukjinKwon - [x] `pip install coverage` to all python implementations (pypy, python2, python3) in Jenkins workers - shaneknapp - [x] Set hidden `SPARK_TEST_KEY` for spark-test's password in Jenkins via Jenkins's feature This should be set in both PR builder and `spark-master-test-sbt-hadoop-2.7` so that later other PRs can test and fix the bugs - shaneknapp - [x] Set an environment variable that indicates `spark-master-test-sbt-hadoop-2.7` so that that specific build can report and update the coverage site - shaneknapp - [x] Make PR builder's test passed HyukjinKwon - [x] Fix flaky test related with coverage HyukjinKwon - 6 consecutive passes out of 7 runs This PR will be co-authored with me and shaneknapp ## How was this patch tested? It will be tested via Jenkins. Closes #23117 from HyukjinKwon/SPARK-7721. Lead-authored-by: Hyukjin Kwon Co-authored-by: hyukjinkwon Co-authored-by: shane knapp Signed-off-by: Hyukjin Kwon --- README.md | 1 + dev/run-tests.py | 63 -- python/pyspark/streaming/tests/test_dstream.py | 10 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f3b90ce..271f2f5 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7) [![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark) +[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan=brightgreen=plastic)](https://spark-test.github.io/pyspark-coverage-site) Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that diff --git a/dev/run-tests.py b/dev/run-tests.py index e1ed274..edd89c9 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py
[spark] branch master updated: [SPARK-26787] Fix standardizeLabels error message in WeightedLeastSquares
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e44f308 [SPARK-26787] Fix standardizeLabels error message in WeightedLeastSquares e44f308 is described below commit e44f308593e8cb02cdaeb5533f387c465aa60c6c Author: bscan AuthorDate: Thu Jan 31 19:50:18 2019 -0600 [SPARK-26787] Fix standardizeLabels error message in WeightedLeastSquares Error message falsely states standardization=True is causing a problem, even when standardization=False. The real issue is standardizeLabels=True, which is set automatically in LinearRegression and not currently available in the Public API. ## What changes were proposed in this pull request? A simple change to an error message. More details here: https://jira.apache.org/jira/browse/SPARK-26787 ## How was this patch tested? This does not change any functionality. Closes #23705 from bscan/bscan-errormsg-1. Authored-by: bscan Signed-off-by: Sean Owen --- .../src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 134d6a9..9f32603 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -133,7 +133,7 @@ private[ml] class WeightedLeastSquares( return new WeightedLeastSquaresModel(coefficients, intercept, diagInvAtWA, Array(0D)) } else { require(!(regParam > 0.0 && standardizeLabel), "The standard deviation of the label is " + - "zero. Model cannot be regularized with standardization=true") + "zero. Model cannot be regularized when labels are standardized.") instr.logWarning(s"The standard deviation of the label is zero. Consider setting " + s"fitIntercept=true.") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-25997][ML] add Python example code for Power Iteration Clustering in spark.ml
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f7d87b1 [SPARK-25997][ML] add Python example code for Power Iteration Clustering in spark.ml f7d87b1 is described below commit f7d87b1685eeac3a2c8b903ddb86e1921fcc193c Author: Huaxin Gao AuthorDate: Thu Jan 31 19:33:44 2019 -0600 [SPARK-25997][ML] add Python example code for Power Iteration Clustering in spark.ml ## What changes were proposed in this pull request? Add python example for Power Iteration Clustering in spark.ml ## How was this patch tested? Manually tested Closes #22996 from huaxingao/spark-25997. Authored-by: Huaxin Gao Signed-off-by: Sean Owen --- docs/ml-clustering.md | 6 +++ .../ml/power_iteration_clustering_example.py | 49 ++ 2 files changed, 55 insertions(+) diff --git a/docs/ml-clustering.md b/docs/ml-clustering.md index 65f2652..21e38ca 100644 --- a/docs/ml-clustering.md +++ b/docs/ml-clustering.md @@ -298,6 +298,12 @@ Refer to the [Java API docs](api/java/org/apache/spark/ml/clustering/PowerIterat {% include_example java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java %} + +Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.clustering.PowerIterationClustering) for more details. + +{% include_example python/ml/power_iteration_clustering_example.py %} + + Refer to the [R API docs](api/R/spark.powerIterationClustering.html) for more details. diff --git a/examples/src/main/python/ml/power_iteration_clustering_example.py b/examples/src/main/python/ml/power_iteration_clustering_example.py new file mode 100644 index 000..c983c4a --- /dev/null +++ b/examples/src/main/python/ml/power_iteration_clustering_example.py @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +An example demonstrating PowerIterationClustering. +Run with: + bin/spark-submit examples/src/main/python/ml/power_iteration_clustering_example.py +""" +# $example on$ +from pyspark.ml.clustering import PowerIterationClustering +# $example off$ +from pyspark.sql import SparkSession + +if __name__ == "__main__": +spark = SparkSession\ +.builder\ +.appName("PowerIterationClusteringExample")\ +.getOrCreate() + +# $example on$ +df = spark.createDataFrame([ +(0, 1, 1.0), +(0, 2, 1.0), +(1, 2, 1.0), +(3, 4, 1.0), +(4, 0, 0.1) +], ["src", "dst", "weight"]) + +pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight") + +# Shows the cluster assignment +pic.assignClusters(df).show() +# $example off$ + +spark.stop() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [DOC][MINOR] Add metrics instance 'mesos_cluster' to monitoring doc
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0fe9c14 [DOC][MINOR] Add metrics instance 'mesos_cluster' to monitoring doc 0fe9c14 is described below commit 0fe9c144fd45e042b41a8fc69b96c377b9f08c4b Author: SongYadong AuthorDate: Thu Jan 31 18:30:17 2019 -0600 [DOC][MINOR] Add metrics instance 'mesos_cluster' to monitoring doc ## What changes were proposed in this pull request? Metrics instance "mesos_cluster" exists in spark, but not mentioned in monitoring.md. This PR add it. ## How was this patch tested? Manually test. Closes #23691 from SongYadong/doc_mesos_metrics_inst. Authored-by: SongYadong Signed-off-by: Sean Owen --- docs/monitoring.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/monitoring.md b/docs/monitoring.md index 1e292d2..726fb5c 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -659,6 +659,7 @@ set of sinks to which metrics are reported. The following instances are currentl * `driver`: The Spark driver process (the process in which your SparkContext is created). * `shuffleService`: The Spark shuffle service. * `applicationMaster`: The Spark ApplicationMaster when running on YARN. +* `mesos_cluster`: The Spark cluster scheduler when running on Mesos. Each instance can report to zero or more _sinks_. Sinks are contained in the `org.apache.spark.metrics.sink` package: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 537d15c [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs 537d15c is described below commit 537d15ca5edb0f21ce2a15b30866a53675f90382 Author: Huon Wilson AuthorDate: Thu Jan 31 17:27:11 2019 -0600 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs ## What changes were proposed in this pull request? Previously a "java.lang.UnsupportedOperationException: empty collection" exception would be thrown due to using `reduce`, rather than `fold` or similar that can tolerate empty RDDs. This behaviour has existed for the Vertex RDDs since it was introduced in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour was inherited by the Edge RDDs via copy-paste in ee29ef3800438501e0ff207feb00a28973fc0769. ## How was this patch tested? Two new unit tests. Closes #23681 from huonw/empty-graphx. Authored-by: Huon Wilson Signed-off-by: Sean Owen (cherry picked from commit da526985c7574dccdcc0cca7452e2e999a5b3012) Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala| 2 +- .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala | 2 +- .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++ .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 11 +++ .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala | 9 + 6 files changed, 33 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 376c7b0..eb8abd1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( /** The number of edges in the RDD. */ override def count(): Long = { -partitionsRDD.map(_._2.size.toLong).reduce(_ + _) +partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _) } override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] = diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 3c6f22d..2da9762 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] ( /** The number of vertices in the RDD. */ override def count(): Long = { -partitionsRDD.map(_.size.toLong).reduce(_ + _) +partitionsRDD.map(_.size.toLong).fold(0)(_ + _) } override private[graphx] def mapVertexPartitions[VD2: ClassTag]( diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 59fdd85..2847a4e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -72,7 +72,7 @@ object SVDPlusPlus { // calculate global rating mean edges.cache() -val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) +val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2)) val u = rs / rc // construct graph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index 7a24e32..8fd3e6f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("count") { +withSpark { sc => + val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]]) + assert(empty.count === 0) + + val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ())) + val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges)) + assert(nonempty.count === edges.size) +} + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 8e63043..434e6a8 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -223,4 +223,15 @@ class
[spark] branch master updated: [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new da52698 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs da52698 is described below commit da526985c7574dccdcc0cca7452e2e999a5b3012 Author: Huon Wilson AuthorDate: Thu Jan 31 17:27:11 2019 -0600 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs ## What changes were proposed in this pull request? Previously a "java.lang.UnsupportedOperationException: empty collection" exception would be thrown due to using `reduce`, rather than `fold` or similar that can tolerate empty RDDs. This behaviour has existed for the Vertex RDDs since it was introduced in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour was inherited by the Edge RDDs via copy-paste in ee29ef3800438501e0ff207feb00a28973fc0769. ## How was this patch tested? Two new unit tests. Closes #23681 from huonw/empty-graphx. Authored-by: Huon Wilson Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala| 2 +- .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala | 2 +- .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++ .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 11 +++ .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala | 9 + 6 files changed, 33 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 376c7b0..eb8abd1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( /** The number of edges in the RDD. */ override def count(): Long = { -partitionsRDD.map(_._2.size.toLong).reduce(_ + _) +partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _) } override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] = diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 3c6f22d..2da9762 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] ( /** The number of vertices in the RDD. */ override def count(): Long = { -partitionsRDD.map(_.size.toLong).reduce(_ + _) +partitionsRDD.map(_.size.toLong).fold(0)(_ + _) } override private[graphx] def mapVertexPartitions[VD2: ClassTag]( diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 59fdd85..2847a4e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -72,7 +72,7 @@ object SVDPlusPlus { // calculate global rating mean edges.cache() -val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) +val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2)) val u = rs / rc // construct graph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index 7a24e32..8fd3e6f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("count") { +withSpark { sc => + val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]]) + assert(empty.count === 0) + + val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ())) + val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges)) + assert(nonempty.count === edges.size) +} + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 8e63043..434e6a8 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { assert(verts.collect().toSeq === data) // test
[spark] branch branch-2.4 updated: [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2b5e033 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs 2b5e033 is described below commit 2b5e033eb937a8074e454e1995616f8a1bf370f8 Author: Huon Wilson AuthorDate: Thu Jan 31 17:27:11 2019 -0600 [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs ## What changes were proposed in this pull request? Previously a "java.lang.UnsupportedOperationException: empty collection" exception would be thrown due to using `reduce`, rather than `fold` or similar that can tolerate empty RDDs. This behaviour has existed for the Vertex RDDs since it was introduced in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour was inherited by the Edge RDDs via copy-paste in ee29ef3800438501e0ff207feb00a28973fc0769. ## How was this patch tested? Two new unit tests. Closes #23681 from huonw/empty-graphx. Authored-by: Huon Wilson Signed-off-by: Sean Owen (cherry picked from commit da526985c7574dccdcc0cca7452e2e999a5b3012) Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala| 2 +- .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala | 2 +- .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++ .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 11 +++ .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala | 9 + 6 files changed, 33 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 376c7b0..eb8abd1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( /** The number of edges in the RDD. */ override def count(): Long = { -partitionsRDD.map(_._2.size.toLong).reduce(_ + _) +partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _) } override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] = diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 3c6f22d..2da9762 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] ( /** The number of vertices in the RDD. */ override def count(): Long = { -partitionsRDD.map(_.size.toLong).reduce(_ + _) +partitionsRDD.map(_.size.toLong).fold(0)(_ + _) } override private[graphx] def mapVertexPartitions[VD2: ClassTag]( diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 59fdd85..2847a4e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -72,7 +72,7 @@ object SVDPlusPlus { // calculate global rating mean edges.cache() -val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) +val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2)) val u = rs / rc // construct graph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index 7a24e32..8fd3e6f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("count") { +withSpark { sc => + val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]]) + assert(empty.count === 0) + + val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ())) + val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges)) + assert(nonempty.count === edges.size) +} + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 8e63043..434e6a8 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -223,4 +223,15 @@ class
[spark] branch master updated: [SPARK-26799][BUILD] Make ANTLR v4 version consistent between Maven and SBT
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2514163 [SPARK-26799][BUILD] Make ANTLR v4 version consistent between Maven and SBT 2514163 is described below commit 2514163366feb91de665b704a6fb703855db6bee Author: seancxmao AuthorDate: Thu Jan 31 14:39:32 2019 -0800 [SPARK-26799][BUILD] Make ANTLR v4 version consistent between Maven and SBT ## What changes were proposed in this pull request? Currently ANTLR v4 versions used by Maven and SBT are slightly different. Maven uses `4.7.1` while SBT uses `4.7`. * Maven(`pom.xml`): `4.7.1` * SBT(`project/SparkBuild.scala`): `antlr4Version in Antlr4 := "4.7"` We should make Maven and SBT use a single version. Furthermore we'd better specify antlr4 version in one place to avoid mismatch between Maven and SBT in the future. This PR lets SBT use antlr4 version specified in Maven POM file, rather than specify its own antlr4 version. This is in the same as how `hadoop.version` is specified in `project/SparkBuild.scala` ## How was this patch tested? Test locally. After run `sbt compile`, Java files generated by ANTLR are located at: ``` sql/catalyst/target/scala-2.12/src_managed/main/antlr4/org/apache/spark/sql/catalyst/parser/*.java ``` These Java files have a comment at the head. We can see now SBT uses ANTLR `4.7.1`. ``` // Generated from .../spark/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 by ANTLR 4.7.1 ``` Closes #23713 from seancxmao/antlr4-version-consistent. Authored-by: seancxmao Signed-off-by: gatorsmile --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8d836da..65ba25c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -568,7 +568,7 @@ object OldDeps { object Catalyst { lazy val settings = antlr4Settings ++ Seq( -antlr4Version in Antlr4 := "4.7", +antlr4Version in Antlr4 := SbtPomKeys.effectivePom.value.getProperties.get("antlr4.version").asInstanceOf[String], antlr4PackageName in Antlr4 := Some("org.apache.spark.sql.catalyst.parser"), antlr4GenListener in Antlr4 := true, antlr4GenVisitor in Antlr4 := true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 94a4b46 [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display 94a4b46 is described below commit 94a4b465bba39dc866c2b060efd1002b55aa7ac2 Author: 韩田田00222924 AuthorDate: Thu Jan 31 09:17:33 2019 -0800 [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display …not synchronized to the UI display ## What changes were proposed in this pull request? The amount of memory used by the broadcast variable is not synchronized to the UI display. I added the case for BroadcastBlockId and updated the memory usage. ## How was this patch tested? We can test this patch with unit tests. Closes #23649 from httfighter/SPARK-26726. Lead-authored-by: 韩田田00222924 Co-authored-by: han.tiant...@zte.com.cn Signed-off-by: Marcelo Vanzin (cherry picked from commit f4a17e916b729f9dc46e859b50a416db1e37b92e) Signed-off-by: Marcelo Vanzin --- .../apache/spark/status/AppStatusListener.scala| 44 +- .../spark/status/AppStatusListenerSuite.scala | 18 + 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 3164dc7..325e3bd 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -678,6 +678,7 @@ private[spark] class AppStatusListener( event.blockUpdatedInfo.blockId match { case block: RDDBlockId => updateRDDBlock(event, block) case stream: StreamBlockId => updateStreamBlock(event, stream) + case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } } @@ -736,15 +737,7 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { -if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) -} else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) -} - } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) } // Update the block entry in the RDD info, keeping track of the deltas above so that we @@ -846,6 +839,39 @@ private[spark] class AppStatusListener( } } + private def updateBroadcastBlock( + event: SparkListenerBlockUpdated, + broadcast: BroadcastBlockId): Unit = { +val executorId = event.blockUpdatedInfo.blockManagerId.executorId +liveExecutors.get(executorId).foreach { exec => + val now = System.nanoTime() + val storageLevel = event.blockUpdatedInfo.storageLevel + + // Whether values are being added to or removed from the existing accounting. + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) + maybeUpdate(exec, now) +} + } + + private def updateExecutorMemoryDiskInfo( + exec: LiveExecutor, + storageLevel: StorageLevel, + memoryDelta: Long, + diskDelta: Long): Unit = { +if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { +exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) + } else { +exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) + } +} +exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) +exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + } + private def getOrCreateStage(info: StageInfo): LiveStage = { val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber), new Function[(Int, Int), LiveStage]() { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 293cf0d..61ed0c8 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -875,6 +875,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { intercept[NoSuchElementException] {
[spark] branch branch-2.4 updated: [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new d9403e4 [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display d9403e4 is described below commit d9403e47f5c04f3d3c3c3ea573d38c393c5a470b Author: 韩田田00222924 AuthorDate: Thu Jan 31 09:17:33 2019 -0800 [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display …not synchronized to the UI display ## What changes were proposed in this pull request? The amount of memory used by the broadcast variable is not synchronized to the UI display. I added the case for BroadcastBlockId and updated the memory usage. ## How was this patch tested? We can test this patch with unit tests. Closes #23649 from httfighter/SPARK-26726. Lead-authored-by: 韩田田00222924 Co-authored-by: han.tiant...@zte.com.cn Signed-off-by: Marcelo Vanzin (cherry picked from commit f4a17e916b729f9dc46e859b50a416db1e37b92e) Signed-off-by: Marcelo Vanzin --- .../apache/spark/status/AppStatusListener.scala| 44 +- .../spark/status/AppStatusListenerSuite.scala | 18 + 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5b564ef..c4dd47d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -750,6 +750,7 @@ private[spark] class AppStatusListener( event.blockUpdatedInfo.blockId match { case block: RDDBlockId => updateRDDBlock(event, block) case stream: StreamBlockId => updateStreamBlock(event, stream) + case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } } @@ -808,15 +809,7 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { -if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) -} else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) -} - } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) } // Update the block entry in the RDD info, keeping track of the deltas above so that we @@ -918,6 +911,39 @@ private[spark] class AppStatusListener( } } + private def updateBroadcastBlock( + event: SparkListenerBlockUpdated, + broadcast: BroadcastBlockId): Unit = { +val executorId = event.blockUpdatedInfo.blockManagerId.executorId +liveExecutors.get(executorId).foreach { exec => + val now = System.nanoTime() + val storageLevel = event.blockUpdatedInfo.storageLevel + + // Whether values are being added to or removed from the existing accounting. + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) + maybeUpdate(exec, now) +} + } + + private def updateExecutorMemoryDiskInfo( + exec: LiveExecutor, + storageLevel: StorageLevel, + memoryDelta: Long, + diskDelta: Long): Unit = { +if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { +exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) + } else { +exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) + } +} +exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) +exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + } + private def getOrCreateStage(info: StageInfo): LiveStage = { val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber), new Function[(Int, Int), LiveStage]() { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index f34be48..6214089 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -939,6 +939,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { intercept[NoSuchElementException] {
[spark] branch master updated: [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f4a17e9 [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display f4a17e9 is described below commit f4a17e916b729f9dc46e859b50a416db1e37b92e Author: 韩田田00222924 AuthorDate: Thu Jan 31 09:17:33 2019 -0800 [SPARK-26726] Synchronize the amount of memory used by the broadcast variable to the UI display …not synchronized to the UI display ## What changes were proposed in this pull request? The amount of memory used by the broadcast variable is not synchronized to the UI display. I added the case for BroadcastBlockId and updated the memory usage. ## How was this patch tested? We can test this patch with unit tests. Closes #23649 from httfighter/SPARK-26726. Lead-authored-by: 韩田田00222924 Co-authored-by: han.tiant...@zte.com.cn Signed-off-by: Marcelo Vanzin --- .../apache/spark/status/AppStatusListener.scala| 44 +- .../spark/status/AppStatusListenerSuite.scala | 18 + 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index f69c7dd..3089f05 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -829,6 +829,7 @@ private[spark] class AppStatusListener( event.blockUpdatedInfo.blockId match { case block: RDDBlockId => updateRDDBlock(event, block) case stream: StreamBlockId => updateStreamBlock(event, stream) + case broadcast: BroadcastBlockId => updateBroadcastBlock(event, broadcast) case _ => } } @@ -887,15 +888,7 @@ private[spark] class AppStatusListener( // Update the executor stats first, since they are used to calculate the free memory // on tracked RDD distributions. maybeExec.foreach { exec => - if (exec.hasMemoryInfo) { -if (storageLevel.useOffHeap) { - exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) -} else { - exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) -} - } - exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) - exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) } // Update the block entry in the RDD info, keeping track of the deltas above so that we @@ -997,6 +990,39 @@ private[spark] class AppStatusListener( } } + private def updateBroadcastBlock( + event: SparkListenerBlockUpdated, + broadcast: BroadcastBlockId): Unit = { +val executorId = event.blockUpdatedInfo.blockManagerId.executorId +liveExecutors.get(executorId).foreach { exec => + val now = System.nanoTime() + val storageLevel = event.blockUpdatedInfo.storageLevel + + // Whether values are being added to or removed from the existing accounting. + val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1) + val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1) + + updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta) + maybeUpdate(exec, now) +} + } + + private def updateExecutorMemoryDiskInfo( + exec: LiveExecutor, + storageLevel: StorageLevel, + memoryDelta: Long, + diskDelta: Long): Unit = { +if (exec.hasMemoryInfo) { + if (storageLevel.useOffHeap) { +exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta) + } else { +exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta) + } +} +exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta) +exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta) + } + private def getOrCreateStage(info: StageInfo): LiveStage = { val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber), new Function[(Int, Int), LiveStage]() { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 356e6d1..9469a46 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -938,6 +938,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { intercept[NoSuchElementException] { check[StreamBlockData](stream1.name) { _ => () } } + +// Update a BroadcastBlock. +val broadcast1
[spark] branch master updated: [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new df4c53e [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC df4c53e is described below commit df4c53e44bc9837a470ec66486237403868cb04f Author: Gengliang Wang AuthorDate: Thu Jan 31 21:29:01 2019 +0800 [SPARK-26673][SQL] File source V2 writes: create framework and migrate ORC ## What changes were proposed in this pull request? Create a framework for write path of File Source V2. Also, migrate write path of ORC to V2. Supported: * Write to file as Dataframe Not Supported: * Partitioning, which is still under development in the data source V2 project. * Bucketing, which is still under development in the data source V2 project. * Catalog. ## How was this patch tested? Unit test Closes #23601 from gengliangwang/orc_write. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/internal/SQLConf.scala| 10 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 17 ++- .../sql/execution/datasources/DataSource.scala | 2 +- .../datasources/FallbackOrcDataSourceV2.scala | 4 +- .../datasources/FileFormatDataWriter.scala | 6 +- .../execution/datasources/FileFormatWriter.scala | 2 +- .../datasources/orc/OrcOutputWriter.scala | 2 +- .../execution/datasources/v2/FileBatchWrite.scala | 53 +++ .../sql/execution/datasources/v2/FileTable.scala | 5 +- .../datasources/v2/FileWriteBuilder.scala | 158 + .../datasources/v2/FileWriterFactory.scala | 58 .../datasources/v2/WriteToDataSourceV2Exec.scala | 13 +- .../execution/datasources/v2/orc/OrcTable.scala| 4 + .../datasources/v2/orc/OrcWriteBuilder.scala | 66 + .../spark/sql/FileBasedDataSourceSuite.scala | 67 - .../sources/v2/FileDataSourceV2FallBackSuite.scala | 67 - 16 files changed, 486 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4484273..11e1a5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1440,6 +1440,14 @@ object SQLConf { .stringConf .createWithDefault("") + val USE_V1_SOURCE_WRITER_LIST = buildConf("spark.sql.sources.write.useV1SourceList") +.internal() +.doc("A comma-separated list of data source short names or fully qualified data source" + + " register class names for which data source V2 write paths are disabled. Writes from these" + + " sources will fall back to the V1 sources.") +.stringConf +.createWithDefault("") + val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.") @@ -2026,6 +2034,8 @@ class SQLConf extends Serializable with Logging { def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST) + def userV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST) + def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS) def disabledV2StreamingMicroBatchReaders: String = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d9404cd..e5f9473 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode @@ -243,8 +243,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") val session = df.sparkSession -val cls = DataSource.lookupDataSource(source,
[spark] branch master updated: [SPARK-19591][ML][MLLIB][FOLLOWUP] Add sample weights to decision trees - fix tolerance
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b3b62ba [SPARK-19591][ML][MLLIB][FOLLOWUP] Add sample weights to decision trees - fix tolerance b3b62ba is described below commit b3b62ba303af9daad4826d274856c61acb88a6a1 Author: Ilya Matiach AuthorDate: Thu Jan 31 05:44:55 2019 -0600 [SPARK-19591][ML][MLLIB][FOLLOWUP] Add sample weights to decision trees - fix tolerance This is a follow-up to PR: https://github.com/apache/spark/pull/21632 ## What changes were proposed in this pull request? This PR tunes the tolerance used for deciding whether to add zero feature values to a value-count map (where the key is the feature value and the value is the weighted count of those feature values). In the previous PR the tolerance scaled by the square of the unweighted number of samples, which is too aggressive for a large number of unweighted samples. Unfortunately using just "Utils.EPSILON * unweightedNumSamples" is not enough either, so I multiplied that by a factor tuned by the testing procedure below. ## How was this patch tested? This involved manually running the sample weight tests for decision tree regressor to see whether the tolerance was large enough to exclude zero feature values. Eg in SBT: ``` ./build/sbt > project mllib > testOnly *DecisionTreeRegressorSuite -- -z "training with sample weights" ``` For validation, I added a print inside the if in the code below and validated that the tolerance was large enough so that we would not include zero features (which don't exist in that test): ``` val valueCountMap = if (weightedNumSamples - partNumSamples > tolerance) { print("should not print this") partValueCountMap + (0.0 -> (weightedNumSamples - partNumSamples)) } else { partValueCountMap } ``` Closes #23682 from imatiach-msft/ilmat/sample-weights-tol. Authored-by: Ilya Matiach Signed-off-by: Sean Owen --- .../src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index fb4c321..b041dd4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -1050,8 +1050,11 @@ private[spark] object RandomForest extends Logging with Serializable { // Calculate the expected number of samples for finding splits val weightedNumSamples = samplesFractionForFindSplits(metadata) * metadata.weightedNumExamples + // scale tolerance by number of samples with constant factor + // Note: constant factor was tuned by running some tests where there were no zero + // feature values and validating we are never within tolerance + val tolerance = Utils.EPSILON * unweightedNumSamples * 100 // add expected zero value count and get complete statistics - val tolerance = Utils.EPSILON * unweightedNumSamples * unweightedNumSamples val valueCountMap = if (weightedNumSamples - partNumSamples > tolerance) { partValueCountMap + (0.0 -> (weightedNumSamples - partNumSamples)) } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bc6f191 [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off bc6f191 is described below commit bc6f19145192835cdfa4fc263b1c35b294c1e0ac Author: Huaxin Gao AuthorDate: Thu Jan 31 19:38:32 2019 +0800 [SPARK-24779][R] Add map_concat / map_from_entries / an option in months_between UDF to disable rounding-off ## What changes were proposed in this pull request? Add the R version of map_concat / map_from_entries / an option in months_between UDF to disable rounding-off ## How was this patch tested? Add test in test_sparkSQL.R Closes #21835 from huaxingao/spark-24779. Authored-by: Huaxin Gao Signed-off-by: Hyukjin Kwon --- R/pkg/NAMESPACE | 2 ++ R/pkg/R/functions.R | 60 +++ R/pkg/R/generics.R| 10 +- R/pkg/tests/fulltests/test_sparkSQL.R | 22 + 4 files changed, 87 insertions(+), 7 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index cfad20d..1dcad16 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -312,8 +312,10 @@ exportMethods("%<=>%", "lower", "lpad", "ltrim", + "map_concat", "map_entries", "map_from_arrays", + "map_from_entries", "map_keys", "map_values", "max", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 58fc410..8f425b1 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -80,6 +80,11 @@ NULL #' \item \code{from_utc_timestamp}, \code{to_utc_timestamp}: time zone to use. #' \item \code{next_day}: day of the week string. #' } +#' @param ... additional argument(s). +#' \itemize{ +#' \item \code{months_between}, this contains an optional parameter to specify the +#' the result is rounded off to 8 digits. +#' } #' #' @name column_datetime_diff_functions #' @rdname column_datetime_diff_functions @@ -217,6 +222,7 @@ NULL #' additional named properties to control how it is converted and accepts the #' same options as the CSV data source. #' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged. +#' \item \code{map_concat}, this contains additional Columns of maps to be unioned. #' } #' @name column_collection_functions #' @rdname column_collection_functions @@ -229,7 +235,7 @@ NULL #' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1), shuffle(tmp$v1))) #' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1), array_distinct(tmp$v1))) #' head(select(tmp, array_position(tmp$v1, 21), array_repeat(df$mpg, 3), array_sort(tmp$v1))) -#' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1), array_remove(tmp$v1, 21))) +#' head(select(tmp, reverse(tmp$v1), array_remove(tmp$v1, 21))) #' tmp2 <- mutate(tmp, v2 = explode(tmp$v1)) #' head(tmp2) #' head(select(tmp, posexplode(tmp$v1))) @@ -238,15 +244,21 @@ NULL #' head(select(tmp, sort_array(tmp$v1, asc = FALSE))) #' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl)) #' head(select(tmp3, map_entries(tmp3$v3), map_keys(tmp3$v3), map_values(tmp3$v3))) -#' head(select(tmp3, element_at(tmp3$v3, "Valiant"))) +#' head(select(tmp3, element_at(tmp3$v3, "Valiant"), map_concat(tmp3$v3, tmp3$v3))) #' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp)) #' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5))) #' head(select(tmp4, array_except(tmp4$v4, tmp4$v5), array_intersect(tmp4$v4, tmp4$v5))) #' head(select(tmp4, array_union(tmp4$v4, tmp4$v5))) -#' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5), map_from_arrays(tmp4$v4, tmp4$v5))) +#' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5))) #' head(select(tmp, concat(df$mpg, df$cyl, df$hp))) #' tmp5 <- mutate(df, v6 = create_array(df$model, df$model)) -#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL")))} +#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL"))) +#' tmp6 <- mutate(df, v7 = create_array(create_array(df$model, df$model))) +#' head(select(tmp6, flatten(tmp6$v7))) +#' tmp7 <- mutate(df, v8 = create_array(df$model, df$cyl), v9 = create_array(df$model, df$hp)) +#' head(select(tmp7, map_from_arrays(tmp7$v8, tmp7$v9))) +#' tmp8 <- mutate(df, v10 = create_array(struct(df$model, df$cyl))) +#' head(select(tmp8, map_from_entries(tmp8$v10)))} NULL #' Window functions for Column operations @@ -2074,15 +2086,21 @@ setMethod("levenshtein", signature(y =
[spark] branch master updated: [SPARK-26448][SQL][FOLLOWUP] should not normalize grouping expressions for final aggregate
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0e2c487 [SPARK-26448][SQL][FOLLOWUP] should not normalize grouping expressions for final aggregate 0e2c487 is described below commit 0e2c4874596269dd835bf69a5592b316345597c5 Author: Wenchen Fan AuthorDate: Thu Jan 31 16:20:18 2019 +0800 [SPARK-26448][SQL][FOLLOWUP] should not normalize grouping expressions for final aggregate ## What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/23388 . `AggUtils.createAggregate` is not the right place to normalize the grouping expressions, as final aggregate is also created by it. The grouping expressions of final aggregate should be attributes which refer to the grouping expressions in partial aggregate. This PR moves the normalization to the caller side of `AggUtils`. ## How was this patch tested? existing tests Closes #23692 from cloud-fan/follow. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../optimizer/NormalizeFloatingNumbers.scala | 16 -- .../spark/sql/execution/SparkStrategies.scala | 25 +++--- .../spark/sql/execution/aggregate/AggUtils.scala | 14 +++- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala index 520f24a..a5921eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala @@ -98,8 +98,10 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] { } private[sql] def normalize(expr: Expression): Expression = expr match { -case _ if expr.dataType == FloatType || expr.dataType == DoubleType => - NormalizeNaNAndZero(expr) +case _ if !needNormalize(expr.dataType) => expr + +case a: Alias => + a.withNewChildren(Seq(normalize(a.child))) case CreateNamedStruct(children) => CreateNamedStruct(children.map(normalize)) @@ -113,22 +115,22 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] { case CreateMap(children) => CreateMap(children.map(normalize)) -case a: Alias if needNormalize(a.dataType) => - a.withNewChildren(Seq(normalize(a.child))) +case _ if expr.dataType == FloatType || expr.dataType == DoubleType => + NormalizeNaNAndZero(expr) -case _ if expr.dataType.isInstanceOf[StructType] && needNormalize(expr.dataType) => +case _ if expr.dataType.isInstanceOf[StructType] => val fields = expr.dataType.asInstanceOf[StructType].fields.indices.map { i => normalize(GetStructField(expr, i)) } CreateStruct(fields) -case _ if expr.dataType.isInstanceOf[ArrayType] && needNormalize(expr.dataType) => +case _ if expr.dataType.isInstanceOf[ArrayType] => val ArrayType(et, containsNull) = expr.dataType val lv = NamedLambdaVariable("arg", et, containsNull) val function = normalize(lv) ArrayTransform(expr, LambdaFunction(function, Seq(lv))) -case _ => expr +case _ => throw new IllegalStateException(s"fail to normalize $expr") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b7cc373..edfa704 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.optimizer.NormalizeFloatingNumbers import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -331,8 +332,17 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val stateVersion = conf.getConf(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION) +// Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here because +// `groupingExpressions` is not extracted during logical phase. +val normalizedGroupingExpressions = namedGroupingExpressions.map { e => + NormalizeFloatingNumbers.normalize(e) match { +case n: