spark git commit: [SPARK-22060][ML] Fix CrossValidator/TrainValidationSplit param persist/load bug
Repository: spark Updated Branches: refs/heads/master 3e6a714c9 -> f180b6534 [SPARK-22060][ML] Fix CrossValidator/TrainValidationSplit param persist/load bug ## What changes were proposed in this pull request? Currently the param of CrossValidator/TrainValidationSplit persist/loading is hardcoding, which is different with other ML estimators. This cause persist bug for new added `parallelism` param. I refactor related code, avoid hardcoding persist/load param. And in the same time, it solve the `parallelism` persisting bug. This refactoring is very useful because we will add more new params in #19208 , hardcoding param persisting/loading making the thing adding new params very troublesome. ## How was this patch tested? Test added. Author: WeichenXuCloses #19278 from WeichenXu123/fix-tuning-param-bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f180b653 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f180b653 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f180b653 Branch: refs/heads/master Commit: f180b65343e706c60b995a3d46d0391612bda966 Parents: 3e6a714 Author: WeichenXu Authored: Fri Sep 22 18:15:01 2017 -0700 Committer: Joseph K. Bradley Committed: Fri Sep 22 18:15:01 2017 -0700 -- .../apache/spark/ml/tuning/CrossValidator.scala | 17 +++ .../spark/ml/tuning/TrainValidationSplit.scala | 18 .../spark/ml/tuning/ValidatorParams.scala | 22 +++- .../org/apache/spark/ml/util/ReadWrite.scala| 20 +- .../spark/ml/tuning/CrossValidatorSuite.scala | 3 +++ .../ml/tuning/TrainValidationSplitSuite.scala | 4 +++- 6 files changed, 46 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f180b653/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index ce2a3a2..7c81cb9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -212,14 +212,13 @@ object CrossValidator extends MLReadable[CrossValidator] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) - val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] - new CrossValidator(metadata.uid) + val cv = new CrossValidator(metadata.uid) .setEstimator(estimator) .setEvaluator(evaluator) .setEstimatorParamMaps(estimatorParamMaps) -.setNumFolds(numFolds) -.setSeed(seed) + DefaultParamsReader.getAndSetParams(cv, metadata, +skipParams = Option(List("estimatorParamMaps"))) + cv } } } @@ -302,17 +301,17 @@ object CrossValidatorModel extends MLReadable[CrossValidatorModel] { val (metadata, estimator, evaluator, estimatorParamMaps) = ValidatorParams.loadImpl(path, sc, className) - val numFolds = (metadata.params \ "numFolds").extract[Int] - val seed = (metadata.params \ "seed").extract[Long] val bestModelPath = new Path(path, "bestModel").toString val bestModel = DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc) val avgMetrics = (metadata.metadata \ "avgMetrics").extract[Seq[Double]].toArray + val model = new CrossValidatorModel(metadata.uid, bestModel, avgMetrics) model.set(model.estimator, estimator) .set(model.evaluator, evaluator) .set(model.estimatorParamMaps, estimatorParamMaps) -.set(model.numFolds, numFolds) -.set(model.seed, seed) + DefaultParamsReader.getAndSetParams(model, metadata, +skipParams = Option(List("estimatorParamMaps"))) + model } } } http://git-wip-us.apache.org/repos/asf/spark/blob/f180b653/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 16db0f5..6e3ad40 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.tuning +import java.io.IOException import java.util.{List => JList} import
[2/2] spark git commit: Preparing development version 2.1.3-SNAPSHOT
Preparing development version 2.1.3-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03db7214 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03db7214 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03db7214 Branch: refs/heads/branch-2.1 Commit: 03db7214931a6a082154c9acf50d976687500670 Parents: fabbb7f Author: Holden KarauAuthored: Fri Sep 22 08:07:45 2017 -0700 Committer: Holden Karau Committed: Fri Sep 22 08:07:45 2017 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mesos/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 39 files changed, 40 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 899d410..6c380b6 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.1.2 +Version: 2.1.3 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 133f8e6..e9f915a 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.1.2 +2.1.3-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index d2631e4..7e203e7 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.2 +2.1.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index c12d480..92dd275 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.2 +2.1.3-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/03db7214/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index d22db36..abca418 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.2 +2.1.3-SNAPSHOT
[1/2] spark git commit: Preparing Spark release v2.1.2-rc2
Repository: spark Updated Branches: refs/heads/branch-2.1 d930bbb40 -> 03db72149 Preparing Spark release v2.1.2-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fabbb7f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fabbb7f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fabbb7f5 Branch: refs/heads/branch-2.1 Commit: fabbb7f59e47590114366d14e15fbbff8c88593c Parents: d930bbb Author: Holden KarauAuthored: Fri Sep 22 08:07:37 2017 -0700 Committer: Holden Karau Committed: Fri Sep 22 08:07:37 2017 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mesos/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 39 files changed, 40 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 6c380b6..899d410 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.1.3 +Version: 2.1.2 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index e9f915a..133f8e6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.1.3-SNAPSHOT +2.1.2 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 7e203e7..d2631e4 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.3-SNAPSHOT +2.1.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 92dd275..c12d480 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.1.3-SNAPSHOT +2.1.2 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/fabbb7f5/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index abca418..d22db36 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.1.2-rc2 [created] fabbb7f59 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21766][PYSPARK][SQL] DataFrame toPandas() raises ValueError with nullable int columns
Repository: spark Updated Branches: refs/heads/master d2b2932d8 -> 3e6a714c9 [SPARK-21766][PYSPARK][SQL] DataFrame toPandas() raises ValueError with nullable int columns ## What changes were proposed in this pull request? When calling `DataFrame.toPandas()` (without Arrow enabled), if there is a `IntegralType` column (`IntegerType`, `ShortType`, `ByteType`) that has null values the following exception is thrown: ValueError: Cannot convert non-finite values (NA or inf) to integer This is because the null values first get converted to float NaN during the construction of the Pandas DataFrame in `from_records`, and then it is attempted to be converted back to to an integer where it fails. The fix is going to check if the Pandas DataFrame can cause such failure when converting, if so, we don't do the conversion and use the inferred type by Pandas. Closes #18945 ## How was this patch tested? Added pyspark test. Author: Liang-Chi HsiehCloses #19319 from viirya/SPARK-21766. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e6a714c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e6a714c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e6a714c Branch: refs/heads/master Commit: 3e6a714c9ee97ef13b3f2010babded3b63fd9d74 Parents: d2b2932 Author: Liang-Chi Hsieh Authored: Fri Sep 22 22:39:47 2017 +0900 Committer: hyukjinkwon Committed: Fri Sep 22 22:39:47 2017 +0900 -- python/pyspark/sql/dataframe.py | 13 ++--- python/pyspark/sql/tests.py | 12 2 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e6a714c/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 88ac413..7b81a0b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -37,6 +37,7 @@ from pyspark.sql.types import _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column from pyspark.sql.readwriter import DataFrameWriter from pyspark.sql.streaming import DataStreamWriter +from pyspark.sql.types import IntegralType from pyspark.sql.types import * __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"] @@ -1891,14 +1892,20 @@ class DataFrame(object): "if using spark.sql.execution.arrow.enable=true" raise ImportError("%s\n%s" % (e.message, msg)) else: +pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) + dtype = {} for field in self.schema: pandas_type = _to_corrected_pandas_type(field.dataType) -if pandas_type is not None: +# SPARK-21766: if an integer field is nullable and has null values, it can be +# inferred by pandas as float column. Once we convert the column with NaN back +# to integer type e.g., np.int16, we will hit exception. So we use the inferred +# float type, not the corrected type from the schema in this case. +if pandas_type is not None and \ +not(isinstance(field.dataType, IntegralType) and field.nullable and +pdf[field.name].isnull().any()): dtype[field.name] = pandas_type -pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) - for f, t in dtype.items(): pdf[f] = pdf[f].astype(t, copy=False) return pdf http://git-wip-us.apache.org/repos/asf/spark/blob/3e6a714c/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ab76c48..3db8bee 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2564,6 +2564,18 @@ class SQLTests(ReusedPySparkTestCase): self.assertEquals(types[2], np.bool) self.assertEquals(types[3], np.float32) +@unittest.skipIf(not _have_pandas, "Pandas not installed") +def test_to_pandas_avoid_astype(self): +import numpy as np +schema = StructType().add("a", IntegerType()).add("b", StringType())\ + .add("c", IntegerType()) +data = [(1, "foo", 16777220), (None, "bar", None)] +df = self.spark.createDataFrame(data, schema) +types = df.toPandas().dtypes +self.assertEquals(types[0], np.float64) # doesn't convert to np.int32 due to NaN value. +self.assertEquals(types[1], np.object) +
spark git commit: [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data
Repository: spark Updated Branches: refs/heads/master 10e37f6eb -> d2b2932d8 [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data ## What changes were proposed in this pull request? `OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead. ## How was this patch tested? Adds new tests to `ColumnVectorSuite` that reproduce the errors. Author: Ala LuszczakCloses #19308 from ala/vector-realloc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2b2932d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2b2932d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2b2932d Branch: refs/heads/master Commit: d2b2932d8be01dee31983121f6fffd16177bf48a Parents: 10e37f6 Author: Ala Luszczak Authored: Fri Sep 22 15:31:43 2017 +0200 Committer: Herman van Hovell Committed: Fri Sep 22 15:31:43 2017 +0200 -- .../vectorized/OffHeapColumnVector.java | 2 +- .../vectorized/ColumnVectorSuite.scala | 26 2 files changed, 27 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 3568275..e1d3685 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -515,7 +515,7 @@ public final class OffHeapColumnVector extends WritableColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { -int oldCapacity = (this.data == 0L) ? 0 : capacity; +int oldCapacity = (nulls == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 998067a..f7b06c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456) assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67) } + + test("[SPARK-22092] off-heap column vector reallocation corrupts array data") { +val arrayType = ArrayType(IntegerType, true) +testVector = new OffHeapColumnVector(8, arrayType) + +val data = testVector.arrayData() +(0 until 8).foreach(i => data.putInt(i, i)) +(0 until 8).foreach(i => testVector.putArray(i, i, 1)) + +// Increase vector's capacity and reallocate the data to new bigger buffers. +testVector.reserve(16) + +// Check that none of the values got lost/overwritten. +val array = new ColumnVector.Array(testVector) +(0 until 8).foreach { i => + assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i)) +} + } + + test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") { +val structType = new StructType().add("int", IntegerType).add("double", DoubleType) +testVector = new OffHeapColumnVector(8, structType) +(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i)) +testVector.reserve(16) +(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [UI][STREAMING] Modify the title, 'Records' instead of 'Input Size'
Repository: spark Updated Branches: refs/heads/master 27fc536d9 -> 10e37f6eb [UI][STREAMING] Modify the title, 'Records' instead of 'Input Size' ## What changes were proposed in this pull request? Spark Streaming is processing data should be record, so the title should be 'Records', and should not be 'Input Size'. Fix before: ![1](https://user-images.githubusercontent.com/26266482/30099599-c64d4a8a-9318-11e7-8a8d-1ca99b409323.png) Fix after: ![2](https://user-images.githubusercontent.com/26266482/30099609-cd4df7d0-9318-11e7-8a27-dbaec6797bb1.png) ## How was this patch tested? manual tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: guoxiaolongCloses #19144 from guoxiaolongzte/streamingUI. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10e37f6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10e37f6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10e37f6e Branch: refs/heads/master Commit: 10e37f6eb6819c9233830c0d97e8fd1c713be0f1 Parents: 27fc536 Author: guoxiaolong Authored: Fri Sep 22 11:51:57 2017 +0100 Committer: Sean Owen Committed: Fri Sep 22 11:51:57 2017 +0100 -- .../scala/org/apache/spark/streaming/ui/AllBatchesTable.scala| 2 +- .../test/scala/org/apache/spark/streaming/UISeleniumSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/10e37f6e/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 70b4bb4..f1070e9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -25,7 +25,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) protected def columns: Seq[Node] = { Batch Time - Input Size + Records Scheduling Delay {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")} http://git-wip-us.apache.org/repos/asf/spark/blob/10e37f6e/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index e7cec99..f2204a1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -121,11 +121,11 @@ class UISeleniumSuite h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true) findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)", + List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", "Output Ops: Succeeded/Total", "Status") } findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { - List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)", + List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)", "Total Delay (?)", "Output Ops: Succeeded/Total") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21190][PYSPARK] Python Vectorized UDFs
Repository: spark Updated Branches: refs/heads/master 8f130ad40 -> 27fc536d9 [SPARK-21190][PYSPARK] Python Vectorized UDFs This PR adds vectorized UDFs to the Python API **Proposed API** Introduce a flag to turn on vectorization for a defined UDF, for example: ``` pandas_udf(DoubleType()) def plus(a, b) return a + b ``` or ``` plus = pandas_udf(lambda a, b: a + b, DoubleType()) ``` Usage is the same as normal UDFs 0-parameter UDFs pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example: ``` pandas_udf(LongType()) def f0(**kwargs): return pd.Series(1).repeat(kwargs["size"]) df.select(f0()) ``` Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available. - [x] Fix support for promoted types with null values - [ ] Discuss 0-param UDF API (use of kwargs) - [x] Add tests for chained UDFs - [ ] Discuss behavior when pyarrow not installed / enabled - [ ] Cleanup pydoc and add user docs Author: Bryan CutlerAuthor: Takuya UESHIN Closes #18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27fc536d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27fc536d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27fc536d Branch: refs/heads/master Commit: 27fc536d9a54eccef7d1cbbe2a6a008043d62ba4 Parents: 8f130ad Author: Bryan Cutler Authored: Fri Sep 22 16:17:41 2017 +0800 Committer: Wenchen Fan Committed: Fri Sep 22 16:17:50 2017 +0800 -- .../org/apache/spark/api/python/PythonRDD.scala | 22 ++- python/pyspark/serializers.py | 65 +- python/pyspark/sql/functions.py | 49 +++-- python/pyspark/sql/tests.py | 197 +++ python/pyspark/sql/types.py | 27 +++ python/pyspark/worker.py| 57 -- .../execution/python/ArrowEvalPythonExec.scala | 61 ++ .../execution/python/BatchEvalPythonExec.scala | 193 ++ .../sql/execution/python/EvalPythonExec.scala | 142 + .../execution/python/ExtractPythonUDFs.scala| 11 +- .../spark/sql/execution/python/PythonUDF.scala | 3 +- .../python/UserDefinedPythonFunction.scala | 5 +- .../python/BatchEvalPythonExecSuite.scala | 7 +- 13 files changed, 666 insertions(+), 173 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27fc536d/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 3377101..86d0405 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -83,10 +83,23 @@ private[spark] case class PythonFunction( */ private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction]) +/** + * Enumerate the type of command that will be sent to the Python worker + */ +private[spark] object PythonEvalType { + val NON_UDF = 0 + val SQL_BATCHED_UDF = 1 + val SQL_PANDAS_UDF = 2 +} + private[spark] object PythonRunner { def apply(func: PythonFunction, bufferSize: Int, reuse_worker: Boolean): PythonRunner = { new PythonRunner( - Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuse_worker, false, Array(Array(0))) + Seq(ChainedPythonFunctions(Seq(func))), + bufferSize, + reuse_worker, + PythonEvalType.NON_UDF, + Array(Array(0))) } } @@ -100,7 +113,7 @@ private[spark] class PythonRunner( funcs: Seq[ChainedPythonFunctions], bufferSize: Int, reuse_worker: Boolean, -isUDF: Boolean, +evalType: Int, argOffsets: Array[Array[Int]]) extends Logging { @@ -309,8 +322,8 @@ private[spark] class PythonRunner( } dataOut.flush() // Serialized command: -if (isUDF) { - dataOut.writeInt(1) +dataOut.writeInt(evalType) +if (evalType != PythonEvalType.NON_UDF) { dataOut.writeInt(funcs.length) funcs.zip(argOffsets).foreach { case (chained, offsets) => dataOut.writeInt(offsets.length) @@ -324,7 +337,6 @@ private[spark] class PythonRunner( } } } else { - dataOut.writeInt(0) val command = funcs.head.funcs.head.command dataOut.writeInt(command.length) dataOut.write(command)
spark git commit: [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts
Repository: spark Updated Branches: refs/heads/branch-2.2 090b987e6 -> de6274a58 [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts ## What changes were proposed in this pull request? Check JDK version (with javac) and use SPARK_VERSION for publish-release ## How was this patch tested? Manually tried local build with wrong JDK / JAVA_HOME & built a local release (LFTP disabled) Author: Holden KarauCloses #19312 from holdenk/improve-release-scripts-r2. (cherry picked from commit 8f130ad40178e35fecb3f2ba4a61ad23e6a90e3d) Signed-off-by: Holden Karau Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de6274a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de6274a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de6274a5 Branch: refs/heads/branch-2.2 Commit: de6274a585fdc2eb9252dc5d5688ce3f3e9e0c39 Parents: 090b987 Author: Holden Karau Authored: Fri Sep 22 00:14:57 2017 -0700 Committer: Holden Karau Committed: Fri Sep 22 00:15:12 2017 -0700 -- dev/create-release/release-build.sh | 33 ++-- 1 file changed, 31 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de6274a5/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index a72307a..f93a96b 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -31,8 +31,8 @@ Top level targets are All other inputs are environment variables GIT_REF - Release tag or commit to build from -SPARK_VERSION - Release identifier used when publishing -SPARK_PACKAGE_VERSION - Release identifier in top level package directory +SPARK_VERSION - Version of Spark being built (e.g. 2.1.2) +SPARK_PACKAGE_VERSION - Release identifier in top level package directory (e.g. 2.1.2-rc1) REMOTE_PARENT_DIR - Parent in which to create doc or release builds. REMOTE_PARENT_MAX_LENGTH - If set, parent directory will be cleaned to only have this number of subdirectories (by deleting old ones). WARNING: This deletes data. @@ -95,6 +95,33 @@ if [ -z "$SPARK_VERSION" ]; then | grep -v INFO | grep -v WARNING | grep -v Download) fi +# Verify we have the right java version set +if [ -z "$JAVA_HOME" ]; then + echo "Please set JAVA_HOME." + exit 1 +fi + +java_version=$("${JAVA_HOME}"/bin/javac -version 2>&1 | cut -d " " -f 2) + +if [[ ! $SPARK_VERSION < "2.2." ]]; then + if [[ $java_version < "1.8." ]]; then +echo "Java version $java_version is less than required 1.8 for 2.2+" +echo "Please set JAVA_HOME correctly." +exit 1 + fi +else + if [[ $java_version > "1.7." ]]; then +if [ -z "$JAVA_7_HOME" ]; then + echo "Java version $java_version is higher than required 1.7 for pre-2.2" + echo "Please set JAVA_HOME correctly." + exit 1 +else + JAVA_HOME="$JAVA_7_HOME" +fi + fi +fi + + if [ -z "$SPARK_PACKAGE_VERSION" ]; then SPARK_PACKAGE_VERSION="${SPARK_VERSION}-$(date +%Y_%m_%d_%H_%M)-${git_hash}" fi @@ -318,6 +345,8 @@ if [[ "$1" == "publish-snapshot" ]]; then fi if [[ "$1" == "publish-release" ]]; then + SPARK_VERSION=$SPARK_PACKAGE_VERSION + cd spark # Publish Spark to Maven release repo echo "Publishing Spark checkout at '$GIT_REF' ($git_hash)" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts
Repository: spark Updated Branches: refs/heads/branch-2.1 1a4b6eea8 -> d930bbb40 [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts ## What changes were proposed in this pull request? Check JDK version (with javac) and use SPARK_VERSION for publish-release ## How was this patch tested? Manually tried local build with wrong JDK / JAVA_HOME & built a local release (LFTP disabled) Author: Holden KarauCloses #19312 from holdenk/improve-release-scripts-r2. (cherry picked from commit 8f130ad40178e35fecb3f2ba4a61ad23e6a90e3d) Signed-off-by: Holden Karau Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d930bbb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d930bbb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d930bbb4 Branch: refs/heads/branch-2.1 Commit: d930bbb40648a39c44780ba51513489923babd3e Parents: 1a4b6ee Author: Holden Karau Authored: Fri Sep 22 00:14:57 2017 -0700 Committer: Holden Karau Committed: Fri Sep 22 00:15:27 2017 -0700 -- dev/create-release/release-build.sh | 33 ++-- 1 file changed, 31 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d930bbb4/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index c4ddc21..fa889d9 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -31,8 +31,8 @@ Top level targets are All other inputs are environment variables GIT_REF - Release tag or commit to build from -SPARK_VERSION - Release identifier used when publishing -SPARK_PACKAGE_VERSION - Release identifier in top level package directory +SPARK_VERSION - Version of Spark being built (e.g. 2.1.2) +SPARK_PACKAGE_VERSION - Release identifier in top level package directory (e.g. 2.1.2-rc1) REMOTE_PARENT_DIR - Parent in which to create doc or release builds. REMOTE_PARENT_MAX_LENGTH - If set, parent directory will be cleaned to only have this number of subdirectories (by deleting old ones). WARNING: This deletes data. @@ -95,6 +95,33 @@ if [ -z "$SPARK_VERSION" ]; then | grep -v INFO | grep -v WARNING | grep -v Download) fi +# Verify we have the right java version set +if [ -z "$JAVA_HOME" ]; then + echo "Please set JAVA_HOME." + exit 1 +fi + +java_version=$("${JAVA_HOME}"/bin/javac -version 2>&1 | cut -d " " -f 2) + +if [[ ! $SPARK_VERSION < "2.2." ]]; then + if [[ $java_version < "1.8." ]]; then +echo "Java version $java_version is less than required 1.8 for 2.2+" +echo "Please set JAVA_HOME correctly." +exit 1 + fi +else + if [[ $java_version > "1.7." ]]; then +if [ -z "$JAVA_7_HOME" ]; then + echo "Java version $java_version is higher than required 1.7 for pre-2.2" + echo "Please set JAVA_HOME correctly." + exit 1 +else + JAVA_HOME="$JAVA_7_HOME" +fi + fi +fi + + if [ -z "$SPARK_PACKAGE_VERSION" ]; then SPARK_PACKAGE_VERSION="${SPARK_VERSION}-$(date +%Y_%m_%d_%H_%M)-${git_hash}" fi @@ -322,6 +349,8 @@ if [[ "$1" == "publish-snapshot" ]]; then fi if [[ "$1" == "publish-release" ]]; then + SPARK_VERSION=$SPARK_PACKAGE_VERSION + cd spark # Publish Spark to Maven release repo echo "Publishing Spark checkout at '$GIT_REF' ($git_hash)" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts
Repository: spark Updated Branches: refs/heads/master 5960686e7 -> 8f130ad40 [SPARK-22072][SPARK-22071][BUILD] Improve release build scripts ## What changes were proposed in this pull request? Check JDK version (with javac) and use SPARK_VERSION for publish-release ## How was this patch tested? Manually tried local build with wrong JDK / JAVA_HOME & built a local release (LFTP disabled) Author: Holden KarauCloses #19312 from holdenk/improve-release-scripts-r2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f130ad4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f130ad4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f130ad4 Branch: refs/heads/master Commit: 8f130ad40178e35fecb3f2ba4a61ad23e6a90e3d Parents: 5960686 Author: Holden Karau Authored: Fri Sep 22 00:14:57 2017 -0700 Committer: Holden Karau Committed: Fri Sep 22 00:14:57 2017 -0700 -- dev/create-release/release-build.sh | 33 ++-- 1 file changed, 31 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8f130ad4/dev/create-release/release-build.sh -- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index f4a7f25..8de1d6a 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -31,8 +31,8 @@ Top level targets are All other inputs are environment variables GIT_REF - Release tag or commit to build from -SPARK_VERSION - Release identifier used when publishing -SPARK_PACKAGE_VERSION - Release identifier in top level package directory +SPARK_VERSION - Version of Spark being built (e.g. 2.1.2) +SPARK_PACKAGE_VERSION - Release identifier in top level package directory (e.g. 2.1.2-rc1) REMOTE_PARENT_DIR - Parent in which to create doc or release builds. REMOTE_PARENT_MAX_LENGTH - If set, parent directory will be cleaned to only have this number of subdirectories (by deleting old ones). WARNING: This deletes data. @@ -104,6 +104,33 @@ if [ -z "$SPARK_VERSION" ]; then | grep -v INFO | grep -v WARNING | grep -v Download) fi +# Verify we have the right java version set +if [ -z "$JAVA_HOME" ]; then + echo "Please set JAVA_HOME." + exit 1 +fi + +java_version=$("${JAVA_HOME}"/bin/javac -version 2>&1 | cut -d " " -f 2) + +if [[ ! $SPARK_VERSION < "2.2." ]]; then + if [[ $java_version < "1.8." ]]; then +echo "Java version $java_version is less than required 1.8 for 2.2+" +echo "Please set JAVA_HOME correctly." +exit 1 + fi +else + if [[ $java_version > "1.7." ]]; then +if [ -z "$JAVA_7_HOME" ]; then + echo "Java version $java_version is higher than required 1.7 for pre-2.2" + echo "Please set JAVA_HOME correctly." + exit 1 +else + JAVA_HOME="$JAVA_7_HOME" +fi + fi +fi + + if [ -z "$SPARK_PACKAGE_VERSION" ]; then SPARK_PACKAGE_VERSION="${SPARK_VERSION}-$(date +%Y_%m_%d_%H_%M)-${git_hash}" fi @@ -326,6 +353,8 @@ if [[ "$1" == "publish-snapshot" ]]; then fi if [[ "$1" == "publish-release" ]]; then + SPARK_VERSION=$SPARK_PACKAGE_VERSION + cd spark # Publish Spark to Maven release repo echo "Publishing Spark checkout at '$GIT_REF' ($git_hash)" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning
Repository: spark Updated Branches: refs/heads/master 5ac96854c -> 5960686e7 [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning ## What changes were proposed in this pull request? Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children. For example, J = {A join B on key1 = key2} 1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC" 2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC" 3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1" So to fix this I changed the behavior of getKeyOrdering(keys, childOutputOrdering) to: 1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering 2. Otherwise => required child ordering In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes. ## How was this patch tested? Added new test cases. Passed all integration tests. Author: maryannxueCloses #19281 from maryannxue/spark-21998. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5960686e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5960686e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5960686e Branch: refs/heads/master Commit: 5960686e791b5d6642a30c43c1de61e96e594a5e Parents: 5ac9685 Author: maryannxue Authored: Thu Sep 21 23:54:16 2017 -0700 Committer: gatorsmile Committed: Thu Sep 21 23:54:16 2017 -0700 -- .../sql/catalyst/expressions/SortOrder.scala| 23 .../execution/exchange/EnsureRequirements.scala | 21 ++- .../sql/execution/joins/SortMergeJoinExec.scala | 17 -- .../scala/org/apache/spark/sql/JoinSuite.scala | 62 4 files changed, 102 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index abcb9a2..ff7c98f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -96,6 +96,29 @@ object SortOrder { sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = { new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions) } + + /** + * Returns if a sequence of SortOrder satisfies another sequence of SortOrder. + * + * SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A + * or of A's prefix. Here are examples of ordering A satisfying ordering B: + * + * ordering A is [x, y] and ordering B is [x] + * ordering A is [x(sameOrderExpressions=x1)] and ordering B is [x1] + * ordering A is [x(sameOrderExpressions=x1), y] and ordering B is [x1] + * + */ + def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): Boolean = { +if (ordering2.isEmpty) { + true +} else if (ordering2.length > ordering1.length) { + false +} else { + ordering2.zip(ordering1).forall { +case (o2, o1) => o1.satisfies(o2) + } +} + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b91d077..1da72f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -234,24 +234,11 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // Now that we've performed any necessary shuffles, add sorts to guarantee output