spark git commit: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths"
Repository: spark Updated Branches: refs/heads/master f2b3525c1 -> 14c4a62c1 [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths" ## What changes were proposed in this pull request? This reverts commit 5fd0294ff8960982cfb3b901f84bc91a9f51bf28 because of a huge performance regression. I manually fixed a minor conflict in `OneForOneBlockFetcher.java`. `Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. `ChannelInputStream` doesn't override `InputStream.skip`, so it's using the default `InputStream.skip` which just consumes and discards data. This causes a huge performance regression when reading shuffle files. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #20119 from zsxwing/revert-SPARK-21475. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14c4a62c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14c4a62c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14c4a62c Branch: refs/heads/master Commit: 14c4a62c126ac5e542f7b82565b5f0fcf3e7fa5a Parents: f2b3525 Author: Shixiong Zhu Authored: Fri Dec 29 22:33:29 2017 -0800 Committer: Shixiong Zhu Committed: Fri Dec 29 22:33:29 2017 -0800 -- .../buffer/FileSegmentManagedBuffer.java| 9 - .../network/shuffle/OneForOneBlockFetcher.java | 4 ++-- .../shuffle/ShuffleIndexInformation.java| 4 ++-- .../sort/BypassMergeSortShuffleWriter.java | 16 +++ .../spark/shuffle/sort/UnsafeShuffleWriter.java | 9 - .../shuffle/IndexShuffleBlockResolver.scala | 6 ++ .../util/collection/ExternalAppendOnlyMap.scala | 21 +--- .../spark/util/collection/ExternalSorter.scala | 17 +++- 8 files changed, 37 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index ea9b3ce..c20fab8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -18,13 +18,12 @@ package org.apache.spark.network.buffer; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import com.google.common.base.Objects; import com.google.common.io.ByteStreams; @@ -94,9 +93,9 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { @Override public InputStream createInputStream() throws IOException { -InputStream is = null; +FileInputStream is = null; try { - is = Files.newInputStream(file.toPath()); + is = new FileInputStream(file); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { @@ -133,7 +132,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { - FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + FileChannel fileChannel = new FileInputStream(file).getChannel(); return new DefaultFileRegion(fileChannel, offset, length); } } http://git-wip-us.apache.org/repos/asf/spark/blob/14c4a62c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 3f2f20b..9cac7d0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -18,11 +18,11 @@ package org.apache.spark.network.shuffle; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.nio.
spark git commit: [SPARK-22771][SQL] Concatenate binary inputs into a binary output
Repository: spark Updated Branches: refs/heads/master 2ea17afb6 -> f2b3525c1 [SPARK-22771][SQL] Concatenate binary inputs into a binary output ## What changes were proposed in this pull request? This pr modified `concat` to concat binary inputs into a single binary output. `concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary. ## How was this patch tested? Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`. Author: Takeshi Yamamuro Closes #19977 from maropu/SPARK-22771. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2b3525c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2b3525c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2b3525c Branch: refs/heads/master Commit: f2b3525c17d660cf6f082bbafea8632615b4f58e Parents: 2ea17af Author: Takeshi Yamamuro Authored: Sat Dec 30 14:09:56 2017 +0800 Committer: gatorsmile Committed: Sat Dec 30 14:09:56 2017 +0800 -- R/pkg/R/functions.R | 3 +- .../apache/spark/unsafe/types/ByteArray.java| 25 ++ docs/sql-programming-guide.md | 2 + python/pyspark/sql/functions.py | 3 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/TypeCoercion.scala| 26 +- .../expressions/stringExpressions.scala | 52 +++- .../sql/catalyst/optimizer/expressions.scala| 15 +- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../catalyst/analysis/TypeCoercionSuite.scala | 54 + .../optimizer/CombineConcatsSuite.scala | 14 +- .../scala/org/apache/spark/sql/functions.scala | 3 +- .../sql-tests/inputs/string-functions.sql | 23 ++ .../inputs/typeCoercion/native/concat.sql | 93 .../sql-tests/results/string-functions.sql.out | 45 +++- .../results/typeCoercion/native/concat.sql.out | 239 +++ 16 files changed, 587 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2b3525c/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index fff230d..55365a4 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2133,7 +2133,8 @@ setMethod("countDistinct", }) #' @details -#' \code{concat}: Concatenates multiple input string columns together into a single string column. +#' \code{concat}: Concatenates multiple input columns together into a single column. +#' If all inputs are binary, concat returns an output as binary. Otherwise, it returns as string. #' #' @rdname column_string_functions #' @aliases concat concat,Column-method http://git-wip-us.apache.org/repos/asf/spark/blob/f2b3525c/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java index 7ced13d..c03caf0 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java @@ -74,4 +74,29 @@ public final class ByteArray { } return Arrays.copyOfRange(bytes, start, end); } + + public static byte[] concat(byte[]... inputs) { +// Compute the total length of the result +int totalLength = 0; +for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { +totalLength += inputs[i].length; + } else { +return null; + } +} + +// Allocate a new byte array, and copy the inputs one by one into it +final byte[] result = new byte[totalLength]; +int offset = 0; +for (int i = 0; i < inputs.length; i++) { + int len = inputs[i].length; + Platform.copyMemory( +inputs[i], Platform.BYTE_ARRAY_OFFSET, +result, Platform.BYTE_ARRAY_OFFSET + offset, +len); + offset += len; +} +return result; + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f2b3525c/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index f02f462..4b5f56c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1780,6 +1780,8 @@ options. - Since Spark 2.3, when either broadcast hash join or broadcast nested loop join is applicable, we prefer to broadcasting the table that is explicitly specified in a broadcast hint. For details, see the section [Broadcast Hint](#broadcast-hint-for
svn commit: r23955 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_20_01-8169630-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Dec 30 04:14:51 2017 New Revision: 23955 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_29_20_01-8169630 docs [This commit notification would consist of 1425 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22881][ML][TEST] ML regression package testsuite add StructuredStreaming test
Repository: spark Updated Branches: refs/heads/master 816963043 -> 2ea17afb6 [SPARK-22881][ML][TEST] ML regression package testsuite add StructuredStreaming test ## What changes were proposed in this pull request? ML regression package testsuite add StructuredStreaming test In order to make testsuite easier to modify, new helper function added in `MLTest`: ``` def testTransformerByGlobalCheckFunc[A : Encoder]( dataframe: DataFrame, transformer: Transformer, firstResultCol: String, otherResultCols: String*) (globalCheckFunction: Seq[Row] => Unit): Unit ``` ## How was this patch tested? N/A Author: WeichenXu Author: Bago Amirbekian Closes #19979 from WeichenXu123/ml_stream_test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ea17afb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ea17afb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ea17afb Branch: refs/heads/master Commit: 2ea17afb63f976500273518bf1b32f9efe250812 Parents: 8169630 Author: WeichenXu Authored: Fri Dec 29 20:06:56 2017 -0800 Committer: Joseph K. Bradley Committed: Fri Dec 29 20:06:56 2017 -0800 -- .../regression/AFTSurvivalRegressionSuite.scala | 19 .../regression/DecisionTreeRegressorSuite.scala | 43 +- .../spark/ml/regression/GBTRegressorSuite.scala | 23 +- .../GeneralizedLinearRegressionSuite.scala | 47 ++-- .../ml/regression/IsotonicRegressionSuite.scala | 43 +++--- .../ml/regression/LinearRegressionSuite.scala | 25 ++- .../scala/org/apache/spark/ml/util/MLTest.scala | 39 .../org/apache/spark/ml/util/MLTestSuite.scala | 12 - .../apache/spark/sql/streaming/StreamTest.scala | 27 ++- 9 files changed, 147 insertions(+), 131 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ea17afb/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index 02e5c6d..4e4ff71 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -19,19 +19,16 @@ package org.apache.spark.ml.regression import scala.util.Random -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.random.{ExponentialGenerator, WeibullGenerator} -import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types._ -class AFTSurvivalRegressionSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class AFTSurvivalRegressionSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ @@ -191,8 +188,8 @@ class AFTSurvivalRegressionSuite assert(model.predict(features) ~== responsePredictR relTol 1E-3) assert(model.predictQuantiles(features) ~== quantilePredictR relTol 1E-3) -model.transform(datasetUnivariate).select("features", "prediction", "quantiles") - .collect().foreach { +testTransformer[(Vector, Double, Double)](datasetUnivariate, model, + "features", "prediction", "quantiles") { case Row(features: Vector, prediction: Double, quantiles: Vector) => assert(prediction ~== model.predict(features) relTol 1E-5) assert(quantiles ~== model.predictQuantiles(features) relTol 1E-5) @@ -261,8 +258,8 @@ class AFTSurvivalRegressionSuite assert(model.predict(features) ~== responsePredictR relTol 1E-3) assert(model.predictQuantiles(features) ~== quantilePredictR relTol 1E-3) -model.transform(datasetMultivariate).select("features", "prediction", "quantiles") - .collect().foreach { +testTransformer[(Vector, Double, Double)](datasetMultivariate, model, + "features", "prediction", "quantiles") { case Row(features: Vector, prediction: Double, quantiles: Vector) => assert(prediction ~== model.predict(features) relTol 1E-5) assert(quantiles ~== model.predictQuantiles(features) relTol 1E-5) @@ -331,8 +328,8 @@ class AFTSurvivalRegressionSuite assert(model.predict(features) ~== respon
spark git commit: [SPARK-22734][ML][PYSPARK] Added Python API for VectorSizeHint.
Repository: spark Updated Branches: refs/heads/master 30fcdc038 -> 816963043 [SPARK-22734][ML][PYSPARK] Added Python API for VectorSizeHint. (Please fill in changes proposed in this fix) Python API for VectorSizeHint Transformer. (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) doc-tests. Author: Bago Amirbekian Closes #20112 from MrBago/vectorSizeHint-PythonAPI. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81696304 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81696304 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81696304 Branch: refs/heads/master Commit: 816963043a09b082e8b9280dfa97fdaa19211015 Parents: 30fcdc0 Author: Bago Amirbekian Authored: Fri Dec 29 19:45:14 2017 -0800 Committer: Joseph K. Bradley Committed: Fri Dec 29 19:45:14 2017 -0800 -- .../spark/ml/feature/VectorSizeHint.scala | 1 + python/pyspark/ml/feature.py| 79 2 files changed, 80 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81696304/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala index 1fe3cfc..f5947d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types.StructType * VectorAssembler needs size information for its input columns and cannot be used on streaming * dataframes without this metadata. * + * Note: VectorSizeHint modifies `inputCol` to include size metadata and does not have an outputCol. */ @Experimental @Since("2.3.0") http://git-wip-us.apache.org/repos/asf/spark/blob/81696304/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 608f2a5..5094324 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -57,6 +57,7 @@ __all__ = ['Binarizer', 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'VectorIndexerModel', + 'VectorSizeHint', 'VectorSlicer', 'Word2Vec', 'Word2VecModel'] @@ -3466,6 +3467,84 @@ class ChiSqSelectorModel(JavaModel, JavaMLReadable, JavaMLWritable): return self._call_java("selectedFeatures") +@inherit_doc +class VectorSizeHint(JavaTransformer, HasInputCol, HasHandleInvalid, JavaMLReadable, + JavaMLWritable): +""" +.. note:: Experimental + +A feature transformer that adds size information to the metadata of a vector column. +VectorAssembler needs size information for its input columns and cannot be used on streaming +dataframes without this metadata. + +.. note:: VectorSizeHint modifies `inputCol` to include size metadata and does not have an +outputCol. + +>>> from pyspark.ml.linalg import Vectors +>>> from pyspark.ml import Pipeline, PipelineModel +>>> data = [(Vectors.dense([1., 2., 3.]), 4.)] +>>> df = spark.createDataFrame(data, ["vector", "float"]) +>>> +>>> sizeHint = VectorSizeHint(inputCol="vector", size=3, handleInvalid="skip") +>>> vecAssembler = VectorAssembler(inputCols=["vector", "float"], outputCol="assembled") +>>> pipeline = Pipeline(stages=[sizeHint, vecAssembler]) +>>> +>>> pipelineModel = pipeline.fit(df) +>>> pipelineModel.transform(df).head().assembled +DenseVector([1.0, 2.0, 3.0, 4.0]) +>>> vectorSizeHintPath = temp_path + "/vector-size-hint-pipeline" +>>> pipelineModel.save(vectorSizeHintPath) +>>> loadedPipeline = PipelineModel.load(vectorSizeHintPath) +>>> loaded = loadedPipeline.transform(df).head().assembled +>>> expected = pipelineModel.transform(df).head().assembled +>>> loaded == expected +True + +.. versionadded:: 2.3.0 +""" + +size = Param(Params._dummy(), "size", "Size of vectors in column.", + typeConverter=TypeConverters.toInt) + +handleInvalid = Param(Params._dummy(), "handleInvalid", + "How to handle invalid vectors in inputCol. Invalid vectors include " + "nulls and vectors with the wrong size. The options are `skip` (filter " + "out rows with invalid vectors), `error` (throw an error) and " + "`optimistic` (do not check the vector size, and keep all rows). " + "`error` by default.",
spark git commit: [SPARK-22922][ML][PYSPARK] Pyspark portion of the fit-multiple API
Repository: spark Updated Branches: refs/heads/master ccda75b0d -> 30fcdc038 [SPARK-22922][ML][PYSPARK] Pyspark portion of the fit-multiple API ## What changes were proposed in this pull request? Adding fitMultiple API to `Estimator` with default implementation. Also update have ml.tuning meta-estimators use this API. ## How was this patch tested? Unit tests. Author: Bago Amirbekian Closes #20058 from MrBago/python-fitMultiple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30fcdc03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30fcdc03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30fcdc03 Branch: refs/heads/master Commit: 30fcdc0380de4f107977d39d067b07e149ab2cb1 Parents: ccda75b Author: Bago Amirbekian Authored: Fri Dec 29 16:31:25 2017 -0800 Committer: Joseph K. Bradley Committed: Fri Dec 29 16:31:25 2017 -0800 -- python/pyspark/ml/base.py | 69 ++-- python/pyspark/ml/tests.py | 15 + python/pyspark/ml/tuning.py | 44 - 3 files changed, 110 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30fcdc03/python/pyspark/ml/base.py -- diff --git a/python/pyspark/ml/base.py b/python/pyspark/ml/base.py index a6767ce..d4470b5 100644 --- a/python/pyspark/ml/base.py +++ b/python/pyspark/ml/base.py @@ -18,13 +18,52 @@ from abc import ABCMeta, abstractmethod import copy +import threading from pyspark import since -from pyspark.ml.param import Params from pyspark.ml.param.shared import * from pyspark.ml.common import inherit_doc from pyspark.sql.functions import udf -from pyspark.sql.types import StructField, StructType, DoubleType +from pyspark.sql.types import StructField, StructType + + +class _FitMultipleIterator(object): +""" +Used by default implementation of Estimator.fitMultiple to produce models in a thread safe +iterator. This class handles the simple case of fitMultiple where each param map should be +fit independently. + +:param fitSingleModel: Function: (int => Model) which fits an estimator to a dataset. +`fitSingleModel` may be called up to `numModels` times, with a unique index each time. +Each call to `fitSingleModel` with an index should return the Model associated with +that index. +:param numModel: Number of models this iterator should produce. + +See Estimator.fitMultiple for more info. +""" +def __init__(self, fitSingleModel, numModels): +""" + +""" +self.fitSingleModel = fitSingleModel +self.numModel = numModels +self.counter = 0 +self.lock = threading.Lock() + +def __iter__(self): +return self + +def __next__(self): +with self.lock: +index = self.counter +if index >= self.numModel: +raise StopIteration("No models remaining.") +self.counter += 1 +return index, self.fitSingleModel(index) + +def next(self): +"""For python2 compatibility.""" +return self.__next__() @inherit_doc @@ -47,6 +86,27 @@ class Estimator(Params): """ raise NotImplementedError() +@since("2.3.0") +def fitMultiple(self, dataset, paramMaps): +""" +Fits a model to the input dataset for each param map in `paramMaps`. + +:param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`. +:param paramMaps: A Sequence of param maps. +:return: A thread safe iterable which contains one model for each param map. Each + call to `next(modelIterator)` will return `(index, model)` where model was fit + using `paramMaps[index]`. `index` values may not be sequential. + +.. note:: DeveloperApi +.. note:: Experimental +""" +estimator = self.copy() + +def fitSingleModel(index): +return estimator.fit(dataset, paramMaps[index]) + +return _FitMultipleIterator(fitSingleModel, len(paramMaps)) + @since("1.3.0") def fit(self, dataset, params=None): """ @@ -61,7 +121,10 @@ class Estimator(Params): if params is None: params = dict() if isinstance(params, (list, tuple)): -return [self.fit(dataset, paramMap) for paramMap in params] +models = [None] * len(params) +for index, model in self.fitMultiple(dataset, params): +models[index] = model +return models elif isinstance(params, dict): if params: return self.copy(params)._fit(dataset) http://git-wip-us.apache.org/repos/asf/spark/bl
svn commit: r23954 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_16_01-ccda75b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Dec 30 00:14:47 2017 New Revision: 23954 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_29_16_01-ccda75b docs [This commit notification would consist of 1425 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning
Repository: spark Updated Branches: refs/heads/master 66a7d6b30 -> ccda75b0d [SPARK-22921][PROJECT-INFRA] Bug fix in jira assigning Small bug fix from last pr, ran a successful merge with this code. Author: Imran Rashid Closes #20117 from squito/SPARK-22921. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccda75b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccda75b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccda75b0 Branch: refs/heads/master Commit: ccda75b0d17637abaa7ab54b68e4db8e1a16f2ce Parents: 66a7d6b Author: Imran Rashid Authored: Fri Dec 29 17:07:01 2017 -0600 Committer: Imran Rashid Committed: Fri Dec 29 17:07:01 2017 -0600 -- dev/merge_spark_pr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ccda75b0/dev/merge_spark_pr.py -- diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index cd5dd1e..57ca840 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -242,8 +242,8 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): cur_summary = issue.fields.summary cur_assignee = issue.fields.assignee if cur_assignee is None: -cur_assignee = choose_jira_assignee(issue) -# Check again, we might not have chose an assignee +cur_assignee = choose_jira_assignee(issue, asf_jira) +# Check again, we might not have chosen an assignee if cur_assignee is None: cur_assignee = "NOT ASSIGNED!!!" else: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r23948 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_12_01-66a7d6b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Dec 29 20:14:47 2017 New Revision: 23948 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_29_12_01-66a7d6b docs [This commit notification would consist of 1425 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22920][SPARKR] sql functions for current_date, current_timestamp, rtrim/ltrim/trim with trimString
Repository: spark Updated Branches: refs/heads/master afc364146 -> 66a7d6b30 [SPARK-22920][SPARKR] sql functions for current_date, current_timestamp, rtrim/ltrim/trim with trimString ## What changes were proposed in this pull request? Add sql functions ## How was this patch tested? manual, unit tests Author: Felix Cheung Closes #20105 from felixcheung/rsqlfuncs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66a7d6b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66a7d6b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66a7d6b3 Branch: refs/heads/master Commit: 66a7d6b30fe5581d09ef660abe2a9c8c334d29f2 Parents: afc3641 Author: Felix Cheung Authored: Fri Dec 29 10:51:43 2017 -0800 Committer: Felix Cheung Committed: Fri Dec 29 10:51:43 2017 -0800 -- R/pkg/DESCRIPTION | 1 + R/pkg/NAMESPACE | 2 + R/pkg/R/functions.R | 105 - R/pkg/R/generics.R| 17 - R/pkg/tests/fulltests/test_sparkSQL.R | 4 +- 5 files changed, 106 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/66a7d6b3/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index d1c846c..6d46c31 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -59,3 +59,4 @@ Collate: 'window.R' RoxygenNote: 5.0.1 VignetteBuilder: knitr +NeedsCompilation: no http://git-wip-us.apache.org/repos/asf/spark/blob/66a7d6b3/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ce3eec0..3219c6f 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -228,6 +228,8 @@ exportMethods("%<=>%", "crc32", "create_array", "create_map", + "current_date", + "current_timestamp", "hash", "cume_dist", "date_add", http://git-wip-us.apache.org/repos/asf/spark/blob/66a7d6b3/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 3a96f94..fff230d 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -39,7 +39,8 @@ NULL #' Date time functions defined for \code{Column}. #' #' @param x Column to compute on. In \code{window}, it must be a time Column of -#' \code{TimestampType}. +#' \code{TimestampType}. This is not used with \code{current_date} and +#' \code{current_timestamp} #' @param format The format for the given dates or timestamps in Column \code{x}. See the #' format used in the following methods: #' \itemize{ @@ -1109,10 +1110,11 @@ setMethod("lower", }) #' @details -#' \code{ltrim}: Trims the spaces from left end for the specified string value. +#' \code{ltrim}: Trims the spaces from left end for the specified string value. Optionally a +#' \code{trimString} can be specified. #' #' @rdname column_string_functions -#' @aliases ltrim ltrim,Column-method +#' @aliases ltrim ltrim,Column,missing-method #' @export #' @examples #' @@ -1128,12 +1130,24 @@ setMethod("lower", #' head(tmp)} #' @note ltrim since 1.5.0 setMethod("ltrim", - signature(x = "Column"), - function(x) { + signature(x = "Column", trimString = "missing"), + function(x, trimString) { jc <- callJStatic("org.apache.spark.sql.functions", "ltrim", x@jc) column(jc) }) +#' @param trimString a character string to trim with +#' @rdname column_string_functions +#' @aliases ltrim,Column,character-method +#' @export +#' @note ltrim(Column, character) since 2.3.0 +setMethod("ltrim", + signature(x = "Column", trimString = "character"), + function(x, trimString) { +jc <- callJStatic("org.apache.spark.sql.functions", "ltrim", x@jc, trimString) +column(jc) + }) + #' @details #' \code{max}: Returns the maximum value of the expression in a group. #' @@ -1348,19 +1362,31 @@ setMethod("bround", }) #' @details -#' \code{rtrim}: Trims the spaces from right end for the specified string value. +#' \code{rtrim}: Trims the spaces from right end for the specified string value. Optionally a +#' \code{trimString} can be specified. #' #' @rdname column_string_functions -#' @aliases rtrim rtrim,Column-method +#' @aliases rtrim rtrim,Column,missing-method #' @export #' @note rtrim since 1.5.0 setMethod("rtrim", - signature(x = "Column"), - function(x) { + signature(x = "Column", trimString = "missing"), +
spark git commit: [SPARK-22905][ML][FOLLOWUP] Fix GaussianMixtureModel save
Repository: spark Updated Branches: refs/heads/master 4e9e6aee4 -> afc364146 [SPARK-22905][ML][FOLLOWUP] Fix GaussianMixtureModel save ## What changes were proposed in this pull request? make sure model data is stored in order. WeichenXu123 ## How was this patch tested? existing tests Author: Zheng RuiFeng Closes #20113 from zhengruifeng/gmm_save. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afc36414 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afc36414 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afc36414 Branch: refs/heads/master Commit: afc36414601d3f1a1946ccf2c630f43b7b7246a8 Parents: 4e9e6ae Author: Zheng RuiFeng Authored: Fri Dec 29 10:08:03 2017 -0800 Committer: Joseph K. Bradley Committed: Fri Dec 29 10:08:03 2017 -0800 -- .../org/apache/spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/afc36414/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index afbe4f9..1933d54 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -154,7 +154,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { val dataArray = Array.tabulate(weights.length) { i => Data(weights(i), gaussians(i).mu, gaussians(i).sigma) } - spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path)) + spark.createDataFrame(sc.makeRDD(dataArray, 1)).write.parquet(Loader.dataPath(path)) } def load(sc: SparkContext, path: String): GaussianMixtureModel = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22864][CORE] Disable allocation schedule in ExecutorAllocationManagerSuite.
Repository: spark Updated Branches: refs/heads/master 8b497046c -> 4e9e6aee4 [SPARK-22864][CORE] Disable allocation schedule in ExecutorAllocationManagerSuite. The scheduled task was racing with the test code and could influence the values returned to the test, triggering assertions. The change adds a new config that is only used during testing, and overrides it on the affected test suite. The issue in the bug can be reliably reproduced by reducing the interval in the test (e.g. to 10ms). While there, fixed an exception that shows up in the logs while these tests run, and simplified some code (which was also causing misleading log messages in the log output of the test). Author: Marcelo Vanzin Closes #20050 from vanzin/SPARK-22864. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e9e6aee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e9e6aee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e9e6aee Branch: refs/heads/master Commit: 4e9e6aee44bb2ddb41b567d659358b22fd824222 Parents: 8b49704 Author: Marcelo Vanzin Authored: Fri Dec 29 10:51:37 2017 -0600 Committer: Imran Rashid Committed: Fri Dec 29 10:51:37 2017 -0600 -- .../spark/ExecutorAllocationManager.scala | 7 +- .../scala/org/apache/spark/SparkContext.scala | 20 .../spark/scheduler/AsyncEventQueue.scala | 2 +- .../spark/ExecutorAllocationManagerSuite.scala | 24 +--- 4 files changed, 28 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 5bc2d9e..2e00dc8 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -141,7 +141,11 @@ private[spark] class ExecutorAllocationManager( private val removeTimes = new mutable.HashMap[String, Long] // Polling loop interval (ms) - private val intervalMillis: Long = 100 + private val intervalMillis: Long = if (Utils.isTesting) { + conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100) +} else { + 100 +} // Clock used to schedule when executors should be added and removed private var clock: Clock = new SystemClock() @@ -856,4 +860,5 @@ private[spark] class ExecutorAllocationManager( private object ExecutorAllocationManager { val NOT_SET = Long.MaxValue + val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval" } http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fcbeddd..1dbb378 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1575,10 +1575,10 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def getExecutorIds(): Seq[String] = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.getExecutorIds() case _ => -logWarning("Requesting executors is only supported in coarse-grained mode") +logWarning("Requesting executors is not supported by current scheduler.") Nil } } @@ -1604,10 +1604,10 @@ class SparkContext(config: SparkConf) extends Logging { hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] ): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount) case _ => -logWarning("Requesting executors is only supported in coarse-grained mode") +logWarning("Requesting executors is not supported by current scheduler.") false } } @@ -1620,10 +1620,10 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def requestExecutors(numAdditionalExecutors: Int): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.requestExecutors(numAdditionalExecutors) case _ => -logWarning("Requesting executors is only supported in coarse-grained mode") +logWarning("Requesting executors
spark git commit: [SPARK-20654][CORE] Add config to limit disk usage of the history server.
Repository: spark Updated Branches: refs/heads/master 11a849b3a -> 8b497046c [SPARK-20654][CORE] Add config to limit disk usage of the history server. This change adds a new configuration option and support code that limits how much disk space the SHS will use. The default value is pretty generous so that applications will, hopefully, only rarely need to be replayed because of their disk stored being evicted. This works by keeping track of how much data each application is using. Also, because it's not possible to know, before replaying, how much space will be needed, it's possible that usage will exceed the configured limit temporarily. The code uses the concept of a "lease" to try to limit how much the SHS will exceed the limit in those cases. Active UIs are also tracked, so they're never deleted. This works in tandem with the existing option of how many active UIs are loaded; because unused UIs will be unloaded, their disk stores will also become candidates for deletion. If the data is not deleted, though, re-loading the UI is pretty quick. Author: Marcelo Vanzin Closes #20011 from vanzin/SPARK-20654. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b497046 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b497046 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b497046 Branch: refs/heads/master Commit: 8b497046c647a21bbed1bdfbdcb176745a1d5cd5 Parents: 11a849b Author: Marcelo Vanzin Authored: Fri Dec 29 10:40:09 2017 -0600 Committer: Imran Rashid Committed: Fri Dec 29 10:40:09 2017 -0600 -- .../deploy/history/FsHistoryProvider.scala | 173 ++ .../history/HistoryServerDiskManager.scala | 327 +++ .../apache/spark/deploy/history/config.scala| 5 + .../spark/scheduler/EventLoggingListener.scala | 15 +- .../history/HistoryServerDiskManagerSuite.scala | 160 + 5 files changed, 606 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b497046/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a299b79..94c80eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -24,6 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.Try import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore @@ -39,11 +40,13 @@ import org.fusesource.leveldbjni.internal.NativeDB import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} +import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -149,6 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } }.getOrElse(new InMemoryStore()) + private val diskManager = storePath.map { path => +new HistoryServerDiskManager(conf, path, listing, clock) + } + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() /** @@ -219,6 +226,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private def startPolling(): Unit = { +diskManager.foreach(_.initialize()) + // Validate the log directory. val path = new Path(logDir) try { @@ -299,63 +308,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attempt.adminAclsGroups.getOrElse("")) secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) -val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) } +val kvstore = try { + diskManager match { +case Some(sm) => + loadDiskStore(sm, appId, attempt) -val (kvstore, needReplay) = uiStorePath match { - case Some(path) => -try { - // The store path is not guaranteed to exist - maybe it hasn't been created, or was - // invalidated because changes to the event log were detected. Need to replay in tha
svn commit: r23946 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_08_01-11a849b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Dec 29 16:20:09 2017 New Revision: 23946 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_29_08_01-11a849b docs [This commit notification would consist of 1425 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrunner is installed.
Repository: spark Updated Branches: refs/heads/master dbd492b7e -> 11a849b3a [SPARK-22370][SQL][PYSPARK][FOLLOW-UP] Fix a test failure when xmlrunner is installed. ## What changes were proposed in this pull request? This is a follow-up pr of #19587. If `xmlrunner` is installed, `VectorizedUDFTests.test_vectorized_udf_check_config` fails by the following error because the `self` which is a subclass of `unittest.TestCase` in the UDF `check_records_per_batch` can't be pickled anymore. ``` PicklingError: Cannot pickle files that are not opened for reading: w ``` This changes the UDF not to refer the `self`. ## How was this patch tested? Tested locally. Author: Takuya UESHIN Closes #20115 from ueshin/issues/SPARK-22370_fup1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11a849b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11a849b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11a849b3 Branch: refs/heads/master Commit: 11a849b3a7b3d03c48d3e17c8a721acedfd89285 Parents: dbd492b Author: Takuya UESHIN Authored: Fri Dec 29 23:04:28 2017 +0900 Committer: hyukjinkwon Committed: Fri Dec 29 23:04:28 2017 +0900 -- python/pyspark/sql/tests.py | 9 + 1 file changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/11a849b3/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3ef1522..1c34c89 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3825,6 +3825,7 @@ class VectorizedUDFTests(ReusedSQLTestCase): def test_vectorized_udf_check_config(self): from pyspark.sql.functions import pandas_udf, col +import pandas as pd orig_value = self.spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch", None) self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 3) try: @@ -3832,11 +3833,11 @@ class VectorizedUDFTests(ReusedSQLTestCase): @pandas_udf(returnType=LongType()) def check_records_per_batch(x): -self.assertTrue(x.size <= 3) -return x +return pd.Series(x.size).repeat(x.size) -result = df.select(check_records_per_batch(col("id"))) -self.assertEqual(df.collect(), result.collect()) +result = df.select(check_records_per_batch(col("id"))).collect() +for (r,) in result: +self.assertTrue(r <= 3) finally: if orig_value is None: self.spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22921][PROJECT-INFRA] Choices for Assigning Jira on Merge
Repository: spark Updated Branches: refs/heads/master fcf66a327 -> dbd492b7e [SPARK-22921][PROJECT-INFRA] Choices for Assigning Jira on Merge In general jiras are assigned to the original reporter or one of the commentors. This updates the merge script to give you a simple choice to do that, so you don't have to do it manually. Author: Imran Rashid Closes #20107 from squito/SPARK-22921. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbd492b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbd492b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbd492b7 Branch: refs/heads/master Commit: dbd492b7e293f0bf4c13076ba21deb506c5f0969 Parents: fcf66a3 Author: Imran Rashid Authored: Fri Dec 29 07:30:49 2017 -0600 Committer: Sean Owen Committed: Fri Dec 29 07:30:49 2017 -0600 -- dev/merge_spark_pr.py | 28 1 file changed, 28 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dbd492b7/dev/merge_spark_pr.py -- diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index 28971b8..cd5dd1e 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -242,6 +242,9 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): cur_summary = issue.fields.summary cur_assignee = issue.fields.assignee if cur_assignee is None: +cur_assignee = choose_jira_assignee(issue) +# Check again, we might not have chose an assignee +if cur_assignee is None: cur_assignee = "NOT ASSIGNED!!!" else: cur_assignee = cur_assignee.displayName @@ -290,6 +293,31 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): print("Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)) +def choose_jira_assignee(issue, asf_jira): +""" +Prompt the user to choose who to assign the issue to in jira, given a list of candidates, +including the original reporter and all commentors +""" +reporter = issue.fields.reporter +commentors = map(lambda x: x.author, issue.fields.comment.comments) +candidates = set(commentors) +candidates.add(reporter) +candidates = list(candidates) +print("JIRA is unassigned, choose assignee") +for idx, author in enumerate(candidates): +annotations = ["Reporter"] if author == reporter else [] +if author in commentors: +annotations.append("Commentor") +print("[%d] %s (%s)" % (idx, author.displayName, ",".join(annotations))) +assignee = raw_input("Enter number of user to assign to (blank to leave unassigned):") +if assignee == "": +return None +else: +assignee = candidates[int(assignee)] +asf_jira.assign_issue(issue.key, assignee.key) +return assignee + + def resolve_jira_issues(title, merge_branches, comment): jira_ids = re.findall("SPARK-[0-9]{4,5}", title) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21657][SQL] optimize explode quadratic memory consumpation
Repository: spark Updated Branches: refs/heads/master cc30ef800 -> fcf66a327 [SPARK-21657][SQL] optimize explode quadratic memory consumpation ## What changes were proposed in this pull request? The issue has been raised in two Jira tickets: [SPARK-21657](https://issues.apache.org/jira/browse/SPARK-21657), [SPARK-16998](https://issues.apache.org/jira/browse/SPARK-16998). Basically, what happens is that in collection generators like explode/inline we create many rows from each row. Currently each exploded row contains also the column on which it was created. This causes, for example, if we have a 10k array in one row that this array will get copy 10k times - to each of the row. this results a qudratic memory consumption. However, it is a common case that the original column gets projected out after the explode, so we can avoid duplicating it. In this solution we propose to identify this situation in the optimizer and turn on a flag for omitting the original column in the generation process. ## How was this patch tested? 1. We added a benchmark test to MiscBenchmark that shows x16 improvement in runtimes. 2. We ran some of the other tests in MiscBenchmark and they show 15% improvements. 3. We ran this code on a specific case from our production data with rows containing arrays of size ~200k and it reduced the runtime from 6 hours to 3 mins. Author: oraviv Author: uzadude Author: uzadude <15645757+uzad...@users.noreply.github.com> Closes #19683 from uzadude/optimize_explode. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fcf66a32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fcf66a32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fcf66a32 Branch: refs/heads/master Commit: fcf66a32760c74e601acb537c51b2311ece6e9d5 Parents: cc30ef8 Author: oraviv Authored: Fri Dec 29 21:08:34 2017 +0800 Committer: Wenchen Fan Committed: Fri Dec 29 21:08:34 2017 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +-- .../sql/catalyst/analysis/CheckAnalysis.scala | 4 +- .../apache/spark/sql/catalyst/dsl/package.scala | 6 +-- .../sql/catalyst/optimizer/Optimizer.scala | 13 +++--- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 21 ++ .../catalyst/optimizer/ColumnPruningSuite.scala | 44 .../optimizer/FilterPushdownSuite.scala | 14 +++ .../sql/catalyst/parser/PlanParserSuite.scala | 16 --- .../scala/org/apache/spark/sql/Dataset.scala| 4 +- .../spark/sql/execution/GenerateExec.scala | 29 ++--- .../spark/sql/execution/SparkStrategies.scala | 6 +-- .../sql/execution/benchmark/MiscBenchmark.scala | 37 13 files changed, 128 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fcf66a32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f2128e..1f7191c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -696,7 +696,7 @@ class Analyzer( (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) case oldVersion: Generate -if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty => +if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => val newOutput = oldVersion.generatorOutput.map(_.newInstance()) (oldVersion, oldVersion.copy(generatorOutput = newOutput)) @@ -1138,7 +1138,7 @@ class Analyzer( case g: Generate => val maybeResolvedExprs = exprs.map(resolveExpression(_, g)) val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, g.child) -(newExprs, g.copy(join = true, child = newChild)) +(newExprs, g.copy(unrequiredChildIndex = Nil, child = newChild)) // For `Distinct` and `SubqueryAlias`, we can't recursively resolve and add attributes // via its children. @@ -1578,7 +1578,7 @@ class Analyzer( resolvedGenerator = Generate( generator, -join = projectList.size > 1, // Only join if there are other expressions in SELECT. +unrequiredChildIndex = Nil, outer = outer, qualifier = None, generatorOutput = Re
svn commit: r23944 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_04_01-cc30ef8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Dec 29 12:19:07 2017 New Revision: 23944 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_29_04_01-cc30ef8 docs [This commit notification would consist of 1425 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22916][SQL] shouldn't bias towards build right if user does not specify
Repository: spark Updated Branches: refs/heads/master 224375c55 -> cc30ef800 [SPARK-22916][SQL] shouldn't bias towards build right if user does not specify ## What changes were proposed in this pull request? When there are no broadcast hints, the current spark strategies will prefer to building the right side, without considering the sizes of the two tables. This patch added the logic to consider the sizes of the two tables for the build side. To make the logic clear, the build side is determined by two steps: 1. If there are broadcast hints, the build side is determined by `broadcastSideByHints`; 2. If there are no broadcast hints, the build side is determined by `broadcastSideBySizes`; 3. If the broadcast is disabled by the config, it falls back to the next cases. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Feng Liu Closes #20099 from liufengdb/fix-spark-strategies. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc30ef80 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc30ef80 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc30ef80 Branch: refs/heads/master Commit: cc30ef8009b82c71a4b8e9caba82ed141761ab85 Parents: 224375c Author: Feng Liu Authored: Fri Dec 29 18:48:47 2017 +0800 Committer: gatorsmile Committed: Fri Dec 29 18:48:47 2017 +0800 -- .../spark/sql/execution/SparkStrategies.scala | 75 .../execution/joins/BroadcastJoinSuite.scala| 75 +++- .../sql/execution/metric/SQLMetricsSuite.scala | 15 ++-- 3 files changed, 116 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc30ef80/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 6b3f301..0ed7c2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -158,45 +158,65 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def smallerSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft - val buildRight = canBuildRight && right.stats.hints.broadcast - val buildLeft = canBuildLeft && left.stats.hints.broadcast - - if (buildRight && buildLeft) { + if (canBuildRight && canBuildLeft) { // Broadcast smaller side base on its estimated physical size // if both sides have broadcast hint smallerSide - } else if (buildRight) { + } else if (canBuildRight) { BuildRight - } else if (buildLeft) { + } else if (canBuildLeft) { BuildLeft - } else if (canBuildRight && canBuildLeft) { + } else { // for the last default broadcast nested loop join smallerSide - } else { -throw new AnalysisException("Can not decide which side to broadcast for this join") } } +private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) + : Boolean = { + val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast + val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast + buildLeft || buildRight +} + +private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) + : BuildSide = { + val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast + val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast + broadcastSide(buildLeft, buildRight, left, right) +} + +private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) + : Boolean = { + val buildLeft = canBuildLeft(joinType) && canBroadcast(left) + val buildRight = canBuildRight(joinType) && canBroadcast(right) + buildLeft || buildRight +} + +private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) + : BuildSide = { + val buildLeft = canBuildLeft(joinType) && canBroadcast(left) + val buildRight = canBuildRight(joinType) && canBroadcast(right) + broadcastSide(buildLeft, buildRight, left, right) +} + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- Broadc
svn commit: r23943 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_29_00_01-224375c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Dec 29 08:15:45 2017 New Revision: 23943 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_29_00_01-224375c docs [This commit notification would consist of 1425 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org