svn commit: r26754 - in /dev/spark/2.3.1-SNAPSHOT-2018_05_07_22_01-4dc6719-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue May 8 05:16:14 2018 New Revision: 26754 Log: Apache Spark 2.3.1-SNAPSHOT-2018_05_07_22_01-4dc6719 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error
Repository: spark Updated Branches: refs/heads/branch-2.3 3a22feab4 -> 4dc6719e9 [SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error ## What changes were proposed in this pull request? Mention `spark.sql.crossJoin.enabled` in error message when an implicit `CROSS JOIN` is detected. ## How was this patch tested? `CartesianProductSuite` and `JoinSuite`. Author: Henry RobinsonCloses #21201 from henryr/spark-24128. (cherry picked from commit cd12c5c3ecf28f7b04f566c2057f9b65eb456b7d) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dc6719e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dc6719e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dc6719e Branch: refs/heads/branch-2.3 Commit: 4dc6719e9afdd0f0ba134bfc69f042f7796ff2cd Parents: 3a22fea Author: Henry Robinson Authored: Tue May 8 12:21:33 2018 +0800 Committer: hyukjinkwon Committed: Tue May 8 12:21:54 2018 +0800 -- R/pkg/tests/fulltests/test_sparkSQL.R | 4 ++-- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 6 -- .../sql/catalyst/optimizer/CheckCartesianProductsSuite.scala | 2 +- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4dc6719e/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index bed26ec..a73811e 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2186,8 +2186,8 @@ test_that("join(), crossJoin() and merge() on a DataFrame", { expect_equal(count(where(join(df, df2), df$name == df2$name)), 3) # cartesian join expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }), - paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for", - " INNER join between logical plans).*")) + paste0(".*(org.apache.spark.sql.AnalysisException: Detected implicit cartesian", + " product for INNER join between logical plans).*")) joined <- crossJoin(df, df2) expect_equal(names(joined), c("age", "name", "name", "test")) http://git-wip-us.apache.org/repos/asf/spark/blob/4dc6719e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a28b6a0..c77e0f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1122,12 +1122,14 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, _) if isCartesianProduct(j) => throw new AnalysisException( -s"""Detected cartesian product for ${j.joinType.sql} join between logical plans +s"""Detected implicit cartesian product for ${j.joinType.sql} join between logical plans |${left.treeString(false).trim} |and |${right.treeString(false).trim} |Join condition is missing or trivial. - |Use the CROSS JOIN syntax to allow cartesian products between these relations.""" + |Either: use the CROSS JOIN syntax to allow cartesian products between these + |relations, or: enable implicit cartesian products by setting the configuration + |variable spark.sql.crossJoin.enabled=true""" .stripMargin) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4dc6719e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala index 21220b3..788fedb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala +++
spark git commit: [SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error
Repository: spark Updated Branches: refs/heads/master 0d63eb888 -> cd12c5c3e [SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error ## What changes were proposed in this pull request? Mention `spark.sql.crossJoin.enabled` in error message when an implicit `CROSS JOIN` is detected. ## How was this patch tested? `CartesianProductSuite` and `JoinSuite`. Author: Henry RobinsonCloses #21201 from henryr/spark-24128. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd12c5c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd12c5c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd12c5c3 Branch: refs/heads/master Commit: cd12c5c3ecf28f7b04f566c2057f9b65eb456b7d Parents: 0d63eb8 Author: Henry Robinson Authored: Tue May 8 12:21:33 2018 +0800 Committer: hyukjinkwon Committed: Tue May 8 12:21:33 2018 +0800 -- R/pkg/tests/fulltests/test_sparkSQL.R | 4 ++-- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala| 6 -- .../sql/catalyst/optimizer/CheckCartesianProductsSuite.scala | 2 +- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd12c5c3/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 3a8866b..43725e0 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2210,8 +2210,8 @@ test_that("join(), crossJoin() and merge() on a DataFrame", { expect_equal(count(where(join(df, df2), df$name == df2$name)), 3) # cartesian join expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }), - paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for", - " INNER join between logical plans).*")) + paste0(".*(org.apache.spark.sql.AnalysisException: Detected implicit cartesian", + " product for INNER join between logical plans).*")) joined <- crossJoin(df, df2) expect_equal(names(joined), c("age", "name", "name", "test")) http://git-wip-us.apache.org/repos/asf/spark/blob/cd12c5c3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 45f1395..bfa61116 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1182,12 +1182,14 @@ object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, _) if isCartesianProduct(j) => throw new AnalysisException( -s"""Detected cartesian product for ${j.joinType.sql} join between logical plans +s"""Detected implicit cartesian product for ${j.joinType.sql} join between logical plans |${left.treeString(false).trim} |and |${right.treeString(false).trim} |Join condition is missing or trivial. - |Use the CROSS JOIN syntax to allow cartesian products between these relations.""" + |Either: use the CROSS JOIN syntax to allow cartesian products between these + |relations, or: enable implicit cartesian products by setting the configuration + |variable spark.sql.crossJoin.enabled=true""" .stripMargin) } } http://git-wip-us.apache.org/repos/asf/spark/blob/cd12c5c3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala index 21220b3..788fedb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CheckCartesianProductsSuite.scala @@ -56,7 +56,7 @@ class CheckCartesianProductsSuite extends PlanTest { val thrownException = the
spark git commit: [SPARK-23975][ML] Add support of array input for all clustering methods
Repository: spark Updated Branches: refs/heads/master 76ecd0950 -> 0d63eb888 [SPARK-23975][ML] Add support of array input for all clustering methods ## What changes were proposed in this pull request? Add support for all of the clustering methods ## How was this patch tested? unit tests added Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Lu WANGCloses #21195 from ludatabricks/SPARK-23975-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d63eb88 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d63eb88 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d63eb88 Branch: refs/heads/master Commit: 0d63ebd17df747fb41d7ba254718bb7af3ae Parents: 76ecd09 Author: Lu WANG Authored: Mon May 7 20:08:41 2018 -0700 Committer: Xiangrui Meng Committed: Mon May 7 20:08:41 2018 -0700 -- .../spark/ml/clustering/BisectingKMeans.scala | 21 - .../spark/ml/clustering/GaussianMixture.scala | 12 +++-- .../org/apache/spark/ml/clustering/KMeans.scala | 31 +++-- .../org/apache/spark/ml/clustering/LDA.scala| 9 ++-- .../org/apache/spark/ml/util/DatasetUtils.scala | 13 +- .../org/apache/spark/ml/util/SchemaUtils.scala | 16 ++- .../ml/clustering/BisectingKMeansSuite.scala| 21 - .../ml/clustering/GaussianMixtureSuite.scala| 21 - .../spark/ml/clustering/KMeansSuite.scala | 48 ++-- .../apache/spark/ml/clustering/LDASuite.scala | 20 +++- .../apache/spark/ml/util/MLTestingUtils.scala | 23 +- 11 files changed, 147 insertions(+), 88 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index addc12ac..438e53b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -22,17 +22,15 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans, BisectingKMeansModel => MLlibBisectingKMeansModel} -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.functions.udf import org.apache.spark.sql.types.{IntegerType, StructType} @@ -75,7 +73,7 @@ private[clustering] trait BisectingKMeansParams extends Params with HasMaxIter * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { -SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) +SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } } @@ -113,7 +111,8 @@ class BisectingKMeansModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) val predictUDF = udf((vector: Vector) => predict(vector)) -dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol +dataset.withColumn($(predictionCol), + predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol))) } @Since("2.0.0") @@ -132,9 +131,9 @@ class BisectingKMeansModel private[ml] ( */ @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { -SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) -val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point } -parentModel.computeCost(data.map(OldVectors.fromML)) +SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol) +val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol) +parentModel.computeCost(data) } @Since("2.0.0") @@ -260,9 +259,7 @@ class BisectingKMeans @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]):
svn commit: r26748 - in /dev/spark/2.3.1-SNAPSHOT-2018_05_07_18_01-3a22fea-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue May 8 01:16:20 2018 New Revision: 26748 Log: Apache Spark 2.3.1-SNAPSHOT-2018_05_07_18_01-3a22fea docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26747 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_07_16_01-76ecd09-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon May 7 23:16:03 2018 New Revision: 26747 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_07_16_01-76ecd09 docs [This commit notification would consist of 1461 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20114][ML] spark.ml parity for sequential pattern mining - PrefixSpan
Repository: spark Updated Branches: refs/heads/master f48bd6bdc -> 76ecd0950 [SPARK-20114][ML] spark.ml parity for sequential pattern mining - PrefixSpan ## What changes were proposed in this pull request? PrefixSpan API for spark.ml. New implementation instead of #20810 ## How was this patch tested? TestSuite added. Author: WeichenXuCloses #20973 from WeichenXu123/prefixSpan2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76ecd095 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76ecd095 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76ecd095 Branch: refs/heads/master Commit: 76ecd095024a658bf68e5db658e4416565b30c17 Parents: f48bd6b Author: WeichenXu Authored: Mon May 7 14:57:14 2018 -0700 Committer: Joseph K. Bradley Committed: Mon May 7 14:57:14 2018 -0700 -- .../org/apache/spark/ml/fpm/PrefixSpan.scala| 96 + .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 3 +- .../apache/spark/ml/fpm/PrefixSpanSuite.scala | 136 +++ 3 files changed, 233 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76ecd095/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala new file mode 100644 index 000..02168fe --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.fpm + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.mllib.fpm.{PrefixSpan => mllibPrefixSpan} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{ArrayType, LongType, StructField, StructType} + +/** + * :: Experimental :: + * A parallel PrefixSpan algorithm to mine frequent sequential patterns. + * The PrefixSpan algorithm is described in J. Pei, et al., PrefixSpan: Mining Sequential Patterns + * Efficiently by Prefix-Projected Pattern Growth + * (see http://doi.org/10.1109/ICDE.2001.914830;>here). + * + * @see https://en.wikipedia.org/wiki/Sequential_Pattern_Mining;>Sequential Pattern Mining + * (Wikipedia) + */ +@Since("2.4.0") +@Experimental +object PrefixSpan { + + /** + * :: Experimental :: + * Finds the complete set of frequent sequential patterns in the input sequences of itemsets. + * + * @param dataset A dataset or a dataframe containing a sequence column which is + *{{{Seq[Seq[_]]}}} type + * @param sequenceCol the name of the sequence column in dataset, rows with nulls in this column + *are ignored + * @param minSupport the minimal support level of the sequential pattern, any pattern that + * appears more than (minSupport * size-of-the-dataset) times will be output + * (recommended value: `0.1`). + * @param maxPatternLength the maximal length of the sequential pattern + * (recommended value: `10`). + * @param maxLocalProjDBSize The maximum number of items (including delimiters used in the + * internal storage format) allowed in a projected database before + * local processing. If a projected database exceeds this size, another + * iteration of distributed prefix growth is run + * (recommended value: `3200`). + * @return A `DataFrame` that contains columns of sequence and corresponding frequency. + * The schema of it will be: + * - `sequence: Seq[Seq[T]]` (T is the item type) + * - `freq: Long` + */ + @Since("2.4.0") + def findFrequentSequentialPatterns( + dataset:
spark git commit: [SPARK-22885][ML][TEST] ML test for StructuredStreaming: spark.ml.tuning
Repository: spark Updated Branches: refs/heads/master 1c9c5de95 -> f48bd6bdc [SPARK-22885][ML][TEST] ML test for StructuredStreaming: spark.ml.tuning ## What changes were proposed in this pull request? ML test for StructuredStreaming: spark.ml.tuning ## How was this patch tested? N/A Author: WeichenXuCloses #20261 from WeichenXu123/ml_stream_tuning_test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f48bd6bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f48bd6bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f48bd6bd Branch: refs/heads/master Commit: f48bd6bdc5aefd9ec43e2d0ee648d17add7ef554 Parents: 1c9c5de Author: WeichenXu Authored: Mon May 7 14:55:41 2018 -0700 Committer: Joseph K. Bradley Committed: Mon May 7 14:55:41 2018 -0700 -- .../apache/spark/ml/tuning/CrossValidatorSuite.scala | 15 +++ .../spark/ml/tuning/TrainValidationSplitSuite.scala | 15 +++ 2 files changed, 22 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f48bd6bd/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 15dade2..e6ee722 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -25,17 +25,17 @@ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressio import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator} import org.apache.spark.ml.feature.HashingTF -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression -import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLTestingUtils} -import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} +import org.apache.spark.mllib.util.LinearDataGenerator import org.apache.spark.sql.Dataset import org.apache.spark.sql.types.StructType class CrossValidatorSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + extends MLTest with DefaultReadWriteTest { import testImplicits._ @@ -66,6 +66,13 @@ class CrossValidatorSuite assert(parent.getRegParam === 0.001) assert(parent.getMaxIter === 10) assert(cvModel.avgMetrics.length === lrParamMaps.length) + +val result = cvModel.transform(dataset).select("prediction").as[Double].collect() +testTransformerByGlobalCheckFunc[(Double, Vector)](dataset.toDF(), cvModel, "prediction") { + rows => +val result2 = rows.map(_.getDouble(0)) +assert(result === result2) +} } test("cross validation with linear regression") { http://git-wip-us.apache.org/repos/asf/spark/blob/f48bd6bd/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 9024342..cd76acf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -24,17 +24,17 @@ import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest} import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression -import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLTestingUtils} -import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} +import org.apache.spark.ml.util.{DefaultReadWriteTest,
spark git commit: [SPARK-23291][SPARK-23291][R][FOLLOWUP] Update SparkR migration note for
Repository: spark Updated Branches: refs/heads/master 56a52e0a5 -> 1c9c5de95 [SPARK-23291][SPARK-23291][R][FOLLOWUP] Update SparkR migration note for ## What changes were proposed in this pull request? This PR fixes the migration note for SPARK-23291 since it's going to backport to 2.3.1. See the discussion in https://issues.apache.org/jira/browse/SPARK-23291 ## How was this patch tested? N/A Author: hyukjinkwonCloses #21249 from HyukjinKwon/SPARK-23291. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c9c5de9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c9c5de9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c9c5de9 Branch: refs/heads/master Commit: 1c9c5de951ed86290bcd7d8edaab952b8cacd290 Parents: 56a52e0 Author: hyukjinkwon Authored: Mon May 7 14:52:14 2018 -0700 Committer: Yanbo Liang Committed: Mon May 7 14:52:14 2018 -0700 -- docs/sparkr.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c9c5de9/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 7fabab5..4faad2c 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -664,6 +664,6 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma - For `summary`, option for statistics to compute has been added. Its output is changed from that from `describe`. - A warning can be raised if versions of SparkR package and the Spark JVM do not match. -## Upgrading to Spark 2.4.0 +## Upgrading to SparkR 2.3.1 and above - - The `start` parameter of `substr` method was wrongly subtracted by one, previously. In other words, the index specified by `start` parameter was considered as 0-base. This can lead to inconsistent substring results and also does not match with the behaviour with `substr` in R. It has been fixed so the `start` parameter of `substr` method is now 1-base, e.g., therefore to get the same result as `substr(df$a, 2, 5)`, it should be changed to `substr(df$a, 1, 4)`. + - In SparkR 2.3.0 and earlier, the `start` parameter of `substr` method was wrongly subtracted by one and considered as 0-based. This can lead to inconsistent substring results and also does not match with the behaviour with `substr` in R. In version 2.3.1 and later, it has been fixed so the `start` parameter of `substr` method is now 1-base. As an example, `substr(lit('abcdef'), 2, 4))` would result to `abc` in SparkR 2.3.0, and the result would be `bcd` in SparkR 2.3.1. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15750][MLLIB][PYSPARK] Constructing FPGrowth fails when no numPartitions specified in pyspark
Repository: spark Updated Branches: refs/heads/master d83e96372 -> 56a52e0a5 [SPARK-15750][MLLIB][PYSPARK] Constructing FPGrowth fails when no numPartitions specified in pyspark ## What changes were proposed in this pull request? Change FPGrowth from private to private[spark]. If no numPartitions is specified, then default value -1 is used. But -1 is only valid in the construction function of FPGrowth, but not in setNumPartitions. So I make this change and use the constructor directly rather than using set method. ## How was this patch tested? Unit test is added Author: Jeff ZhangCloses #13493 from zjffdu/SPARK-15750. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56a52e0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56a52e0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56a52e0a Branch: refs/heads/master Commit: 56a52e0a58fc82ea69e47d0d8c4f905565be7c8b Parents: d83e963 Author: Jeff Zhang Authored: Mon May 7 14:47:58 2018 -0700 Committer: Joseph K. Bradley Committed: Mon May 7 14:47:58 2018 -0700 -- .../apache/spark/mllib/api/python/PythonMLLibAPI.scala | 5 + .../scala/org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- python/pyspark/mllib/tests.py | 12 3 files changed, 14 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/56a52e0a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index b32d3f2..db3f074 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -572,10 +572,7 @@ private[python] class PythonMLLibAPI extends Serializable { data: JavaRDD[java.lang.Iterable[Any]], minSupport: Double, numPartitions: Int): FPGrowthModel[Any] = { -val fpg = new FPGrowth() - .setMinSupport(minSupport) - .setNumPartitions(numPartitions) - +val fpg = new FPGrowth(minSupport, numPartitions) val model = fpg.run(data.rdd.map(_.asScala.toArray)) new FPGrowthModelWrapper(model) } http://git-wip-us.apache.org/repos/asf/spark/blob/56a52e0a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index f6b1143..4f2b7e6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -162,7 +162,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { * */ @Since("1.3.0") -class FPGrowth private ( +class FPGrowth private[spark] ( private var minSupport: Double, private var numPartitions: Int) extends Logging with Serializable { http://git-wip-us.apache.org/repos/asf/spark/blob/56a52e0a/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 14d788b..4c2ce13 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -57,6 +57,7 @@ from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT from pyspark.mllib.linalg.distributed import RowMatrix from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD +from pyspark.mllib.fpm import FPGrowth from pyspark.mllib.recommendation import Rating from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD from pyspark.mllib.random import RandomRDDs @@ -1762,6 +1763,17 @@ class DimensionalityReductionTests(MLlibTestCase): self.assertEqualUpToSign(pcs.toArray()[:, k - 1], expected_pcs[:, k - 1]) +class FPGrowthTest(MLlibTestCase): + +def test_fpgrowth(self): +data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]] +rdd = self.sc.parallelize(data, 2) +model1 = FPGrowth.train(rdd, 0.6, 2) +# use default data partition number when numPartitions is not specified +model2 = FPGrowth.train(rdd, 0.6) +self.assertEqual(sorted(model1.freqItemsets().collect()), + sorted(model2.freqItemsets().collect())) + if __name__ == "__main__": from pyspark.mllib.tests
spark git commit: [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not reduce starting position by 1 when calling Scala API
Repository: spark Updated Branches: refs/heads/branch-2.3 f87785a76 -> 3a22feab4 [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not reduce starting position by 1 when calling Scala API ## What changes were proposed in this pull request? This PR backports https://github.com/apache/spark/commit/24b5c69ee3feded439e5bb6390e4b63f503eeafe and https://github.com/apache/spark/pull/21249 There's no conflict but I opened this just to run the test and for sure. See the discussion in https://issues.apache.org/jira/browse/SPARK-23291 ## How was this patch tested? Jenkins tests. Author: hyukjinkwonAuthor: Liang-Chi Hsieh Closes #21250 from HyukjinKwon/SPARK-23291-backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a22feab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a22feab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a22feab Branch: refs/heads/branch-2.3 Commit: 3a22feab4dc9f0cffe3aaec692e27ab277666507 Parents: f87785a Author: hyukjinkwon Authored: Mon May 7 14:48:28 2018 -0700 Committer: Yanbo Liang Committed: Mon May 7 14:48:28 2018 -0700 -- R/pkg/R/column.R | 10 -- R/pkg/tests/fulltests/test_sparkSQL.R | 1 + docs/sparkr.md| 4 3 files changed, 13 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a22feab/R/pkg/R/column.R -- diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R index 3095adb..3d6d9f9 100644 --- a/R/pkg/R/column.R +++ b/R/pkg/R/column.R @@ -164,12 +164,18 @@ setMethod("alias", #' @aliases substr,Column-method #' #' @param x a Column. -#' @param start starting position. +#' @param start starting position. It should be 1-base. #' @param stop ending position. +#' @examples +#' \dontrun{ +#' df <- createDataFrame(list(list(a="abcdef"))) +#' collect(select(df, substr(df$a, 1, 4))) # the result is `abcd`. +#' collect(select(df, substr(df$a, 2, 4))) # the result is `bcd`. +#' } #' @note substr since 1.4.0 setMethod("substr", signature(x = "Column"), function(x, start, stop) { -jc <- callJMethod(x@jc, "substr", as.integer(start - 1), as.integer(stop - start + 1)) +jc <- callJMethod(x@jc, "substr", as.integer(start), as.integer(stop - start + 1)) column(jc) }) http://git-wip-us.apache.org/repos/asf/spark/blob/3a22feab/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 5197838..bed26ec 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1649,6 +1649,7 @@ test_that("string operators", { expect_false(first(select(df, startsWith(df$name, "m")))[[1]]) expect_true(first(select(df, endsWith(df$name, "el")))[[1]]) expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi") + expect_equal(first(select(df, substr(df$name, 4, 6)))[[1]], "hae") if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) { expect_true(startsWith("Hello World", "Hello")) expect_false(endsWith("Hello World", "a")) http://git-wip-us.apache.org/repos/asf/spark/blob/3a22feab/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 6685b58..73f9424 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -663,3 +663,7 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma - The `stringsAsFactors` parameter was previously ignored with `collect`, for example, in `collect(createDataFrame(iris), stringsAsFactors = TRUE))`. It has been corrected. - For `summary`, option for statistics to compute has been added. Its output is changed from that from `describe`. - A warning can be raised if versions of SparkR package and the Spark JVM do not match. + +## Upgrading to SparkR 2.3.1 and above + + - In SparkR 2.3.0 and earlier, the `start` parameter of `substr` method was wrongly subtracted by one and considered as 0-based. This can lead to inconsistent substring results and also does not match with the behaviour with `substr` in R. In version 2.3.1 and later, it has been fixed so the `start` parameter of `substr` method is now 1-base. As an example, `substr(lit('abcdef'), 2, 4))` would result to `abc` in SparkR 2.3.0, and the result would be `bcd` in SparkR 2.3.1. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands,
svn commit: r26745 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_07_12_01-d83e963-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon May 7 19:16:37 2018 New Revision: 26745 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_07_12_01-d83e963 docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24043][SQL] Interpreted Predicate should initialize nondeterministic expressions
Repository: spark Updated Branches: refs/heads/master 4e861db5f -> d83e96372 [SPARK-24043][SQL] Interpreted Predicate should initialize nondeterministic expressions ## What changes were proposed in this pull request? When creating an InterpretedPredicate instance, initialize any Nondeterministic expressions in the expression tree to avoid java.lang.IllegalArgumentException on later call to eval(). ## How was this patch tested? - sbt SQL tests - python SQL tests - new unit test Author: Bruce RobbinsCloses #21144 from bersprockets/interpretedpredicate. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d83e9637 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d83e9637 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d83e9637 Branch: refs/heads/master Commit: d83e9637246b05eea202add07a168688f6c0481b Parents: 4e861db Author: Bruce Robbins Authored: Mon May 7 17:54:39 2018 +0200 Committer: Herman van Hovell Committed: Mon May 7 17:54:39 2018 +0200 -- .../apache/spark/sql/catalyst/expressions/predicates.scala | 8 .../spark/sql/catalyst/expressions/PredicateSuite.scala | 6 ++ 2 files changed, 14 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d83e9637/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index e195ec1..f8c6dc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -36,6 +36,14 @@ object InterpretedPredicate { case class InterpretedPredicate(expression: Expression) extends BasePredicate { override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] + + override def initialize(partitionIndex: Int): Unit = { +super.initialize(partitionIndex) +expression.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +} + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d83e9637/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index 1bfd180..ac76b17 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -449,4 +449,10 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), false) checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), false) } + + test("Interpreted Predicate should initialize nondeterministic expressions") { +val interpreted = InterpretedPredicate.create(LessThan(Rand(7), Literal(1.0))) +interpreted.initialize(0) +assert(interpreted.eval(new UnsafeRow())) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26739 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_07_05_34-4e861db-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon May 7 12:50:22 2018 New Revision: 26739 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_07_05_34-4e861db docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve
Repository: spark Updated Branches: refs/heads/master e35ad3cad -> 4e861db5f [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve ## What changes were proposed in this pull request? `LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` ## How was this patch tested? Existing tests. Author: Herman van HovellCloses #14083 from hvanhovell/SPARK-16406. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e861db5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e861db5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e861db5 Branch: refs/heads/master Commit: 4e861db5f149e10fd8dfe6b3c1484821a590b1e8 Parents: e35ad3c Author: Herman van Hovell Authored: Mon May 7 11:21:22 2018 +0200 Committer: Herman van Hovell Committed: Mon May 7 11:21:22 2018 +0200 -- .../sql/catalyst/expressions/package.scala | 86 +++ .../catalyst/plans/logical/LogicalPlan.scala| 108 ++- 2 files changed, 93 insertions(+), 101 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e861db5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 1a48995..8a06daa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql.catalyst +import java.util.Locale + import com.google.common.collect.Maps +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -138,6 +142,88 @@ package object expressions { def indexOf(exprId: ExprId): Int = { Option(exprIdToOrdinal.get(exprId)).getOrElse(-1) } + +private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { + m.mapValues(_.distinct).map(identity) +} + +/** Map to use for direct case insensitive attribute lookups. */ +@transient private lazy val direct: Map[String, Seq[Attribute]] = { + unique(attrs.groupBy(_.name.toLowerCase(Locale.ROOT))) +} + +/** Map to use for qualified case insensitive attribute lookups. */ +@transient private val qualified: Map[(String, String), Seq[Attribute]] = { + val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a => +(a.qualifier.get.toLowerCase(Locale.ROOT), a.name.toLowerCase(Locale.ROOT)) + } + unique(grouped) +} + +/** Perform attribute resolution given a name and a resolver. */ +def resolve(nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = { + // Collect matching attributes given a name and a lookup. + def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { +candidates.toSeq.flatMap(_.collect { + case a if resolver(a.name, name) => a.withName(name) +}) + } + + // Find matches for the given name assuming that the 1st part is a qualifier (i.e. table name, + // alias, or subquery alias) and the 2nd part is the actual name. This returns a tuple of + // matched attributes and a list of parts that are to be resolved. + // + // For example, consider an example where "a" is the table name, "b" is the column name, + // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b", + // and the second element will be List("c"). + val matches = nameParts match { +case qualifier +: name +: nestedFields => + val key = (qualifier.toLowerCase(Locale.ROOT),
svn commit: r26737 - in /dev/spark/2.3.1-SNAPSHOT-2018_05_07_02_01-f87785a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon May 7 09:16:54 2018 New Revision: 26737 Log: Apache Spark 2.3.1-SNAPSHOT-2018_05_07_02_01-f87785a docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23930][SQL] Add slice function
Repository: spark Updated Branches: refs/heads/master f06528015 -> e35ad3cad [SPARK-23930][SQL] Add slice function ## What changes were proposed in this pull request? The PR add the `slice` function. The behavior of the function is based on Presto's one. The function slices an array according to the requested start index and length. ## How was this patch tested? added UTs Author: Marco GaidoCloses #21040 from mgaido91/SPARK-23930. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e35ad3ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e35ad3ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e35ad3ca Branch: refs/heads/master Commit: e35ad3caddeaa4b0d4c8524dcfb9e9f56dc7fe3d Parents: f065280 Author: Marco Gaido Authored: Mon May 7 16:57:37 2018 +0900 Committer: Takuya UESHIN Committed: Mon May 7 16:57:37 2018 +0900 -- python/pyspark/sql/functions.py | 13 ++ .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/codegen/CodeGenerator.scala | 34 .../expressions/collectionOperations.scala | 163 ++- .../CollectionExpressionsSuite.scala| 28 .../expressions/ExpressionEvalHelper.scala | 6 + .../expressions/ObjectExpressionsSuite.scala| 1 - .../scala/org/apache/spark/sql/functions.scala | 10 ++ .../spark/sql/DataFrameFunctionsSuite.scala | 16 ++ 9 files changed, 233 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e35ad3ca/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index bd55b5f..ac3c797 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1834,6 +1834,19 @@ def array_contains(col, value): return Column(sc._jvm.functions.array_contains(_to_java_column(col), value)) +@since(2.4) +def slice(x, start, length): +""" +Collection function: returns an array containing all the elements in `x` from index `start` +(or starting from the end if `start` is negative) with the specified `length`. +>>> df = spark.createDataFrame([([1, 2, 3],), ([4, 5],)], ['x']) +>>> df.select(slice(df.x, 2, 2).alias("sliced")).collect() +[Row(sliced=[2, 3]), Row(sliced=[5])] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.slice(_to_java_column(x), start, length)) + + @ignore_unicode_prefix @since(2.4) def array_join(col, delimiter, null_replacement=None): http://git-wip-us.apache.org/repos/asf/spark/blob/e35ad3ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 01776b8..87b0911 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -410,6 +410,7 @@ object FunctionRegistry { expression[MapKeys]("map_keys"), expression[MapValues]("map_values"), expression[Size]("size"), +expression[Slice]("slice"), expression[Size]("cardinality"), expression[SortArray]("sort_array"), expression[ArrayMin]("array_min"), http://git-wip-us.apache.org/repos/asf/spark/blob/e35ad3ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index cf0a91f..4dda525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types._ import org.apache.spark.util.{ParentClassLoader, Utils} @@ -731,6 +732,39 @@ class CodegenContext { } /** + * Generates code creating a [[UnsafeArrayData]]. + * + * @param
spark git commit: [SPARK-24160][FOLLOWUP] Fix compilation failure
Repository: spark Updated Branches: refs/heads/master c5981976f -> f06528015 [SPARK-24160][FOLLOWUP] Fix compilation failure ## What changes were proposed in this pull request? SPARK-24160 is causing a compilation failure (after SPARK-24143 was merged). This fixes the issue. ## How was this patch tested? building successfully Author: Marco GaidoCloses #21256 from mgaido91/SPARK-24160_FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0652801 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0652801 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0652801 Branch: refs/heads/master Commit: f06528015d5856d6dc5cce00309bc2ae985e080f Parents: c598197 Author: Marco Gaido Authored: Mon May 7 15:42:10 2018 +0800 Committer: Wenchen Fan Committed: Mon May 7 15:42:10 2018 +0800 -- .../apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0652801/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 8e9374b..a2997db 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -546,7 +546,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT taskContext, transfer, blockManager, - blocksByAddress, + blocksByAddress.toIterator, (_, in) => in, 48 * 1024 * 1024, Int.MaxValue, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26734 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_07_00_01-c598197-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon May 7 07:17:46 2018 New Revision: 26734 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_07_00_01-c598197 docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Repository: spark Updated Branches: refs/heads/branch-2.3 3f78f60cc -> f87785a76 [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky ## What changes were proposed in this pull request? DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build. There were multiple issues with the test: 1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout: ``` eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } ``` 2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait. This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `onTaskStart` where stage ID is provided. In order to make sure cancelStage called for all stages `waitUntilEmpty` is called on `ListenerBus`. In [PR20888](https://github.com/apache/spark/pull/20888) this tried to get solved by: * Stopping the executor thread with `wait` * Wait for all `cancelStage` called * Kill the executor thread by setting `SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL` but the thread killing left the shared `SparkContext` sometimes in a state where further jobs can't be submitted. As a result DataFrameRangeSuite.test("Cancelling stage in a query with Range.") test passed properly but the next test inside the suite was hanging. ## How was this patch tested? Existing unit test executed 10k times. Author: Gabor SomogyiCloses #21214 from gaborgsomogyi/SPARK-23775_1. (cherry picked from commit c5981976f1d514a3ad8a684b9a21cebe38b786fa) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f87785a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f87785a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f87785a7 Branch: refs/heads/branch-2.3 Commit: f87785a76f443bcf996c60d190afc29aa2e3b6e4 Parents: 3f78f60 Author: Gabor Somogyi Authored: Mon May 7 14:45:14 2018 +0800 Committer: Wenchen Fan Committed: Mon May 7 14:46:56 2018 +0800 -- .../apache/spark/sql/DataFrameRangeSuite.scala | 24 +++- 1 file changed, 8 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f87785a7/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 57a930d..b0b4664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -23,8 +23,8 @@ import scala.util.Random import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkException, TaskContext} -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.SparkException +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) +.toDF("id").agg(sum("id")).collect() } ex.getCause() match { case null => @@ -180,6 +174,8 @@ class DataFrameRangeSuite extends QueryTest with
spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
Repository: spark Updated Branches: refs/heads/master d2aa859b4 -> c5981976f [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky ## What changes were proposed in this pull request? DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays sometimes in an infinite loop and times out the build. There were multiple issues with the test: 1. The first valid stageId is zero when the test started alone and not in a suite and the following code waits until timeout: ``` eventually(timeout(10.seconds), interval(1.millis)) { assert(DataFrameRangeSuite.stageToKill > 0) } ``` 2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread after the reset which ended up in canceling the same stage 2 times. This caused the infinite wait. This PR solves this mentioned flakyness by removing the shared `DataFrameRangeSuite.stageToKill` and using `onTaskStart` where stage ID is provided. In order to make sure cancelStage called for all stages `waitUntilEmpty` is called on `ListenerBus`. In [PR20888](https://github.com/apache/spark/pull/20888) this tried to get solved by: * Stopping the executor thread with `wait` * Wait for all `cancelStage` called * Kill the executor thread by setting `SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL` but the thread killing left the shared `SparkContext` sometimes in a state where further jobs can't be submitted. As a result DataFrameRangeSuite.test("Cancelling stage in a query with Range.") test passed properly but the next test inside the suite was hanging. ## How was this patch tested? Existing unit test executed 10k times. Author: Gabor SomogyiCloses #21214 from gaborgsomogyi/SPARK-23775_1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5981976 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5981976 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5981976 Branch: refs/heads/master Commit: c5981976f1d514a3ad8a684b9a21cebe38b786fa Parents: d2aa859 Author: Gabor Somogyi Authored: Mon May 7 14:45:14 2018 +0800 Committer: Wenchen Fan Committed: Mon May 7 14:45:14 2018 +0800 -- .../apache/spark/sql/DataFrameRangeSuite.scala | 24 +++- 1 file changed, 8 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c5981976/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 57a930d..b0b4664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -23,8 +23,8 @@ import scala.util.Random import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkException, TaskContext} -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.SparkException +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onJobStart(jobStart: SparkListenerJobStart): Unit = { -eventually(timeout(10.seconds), interval(1.millis)) { - assert(DataFrameRangeSuite.stageToKill > 0) -} -sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +sparkContext.cancelStage(taskStart.stageId) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { -DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(0, 1000L, 1, 1).map { x => -DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() -x - }.toDF("id").agg(sum("id")).collect() + spark.range(0, 1000L, 1, 1) +.toDF("id").agg(sum("id")).collect() } ex.getCause() match { case null => @@ -180,6 +174,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.") }
spark git commit: [SPARK-24160] ShuffleBlockFetcherIterator should fail if it receives zero-size blocks
Repository: spark Updated Branches: refs/heads/master 7564a9a70 -> d2aa859b4 [SPARK-24160] ShuffleBlockFetcherIterator should fail if it receives zero-size blocks ## What changes were proposed in this pull request? This patch modifies `ShuffleBlockFetcherIterator` so that the receipt of zero-size blocks is treated as an error. This is done as a preventative measure to guard against a potential source of data loss bugs. In the shuffle layer, we guarantee that zero-size blocks will never be requested (a block containing zero records is always 0 bytes in size and is marked as empty such that it will never be legitimately requested by executors). However, the existing code does not fully take advantage of this invariant in the shuffle-read path: the existing code did not explicitly check whether blocks are non-zero-size. Additionally, our decompression and deserialization streams treat zero-size inputs as empty streams rather than errors (EOF might actually be treated as "end-of-stream" in certain layers (longstanding behavior dating to earliest versions of Spark) and decompressors like Snappy may be tolerant to zero-size inputs). As a result, if some other bug causes legitimate buffers to be replaced with zero-sized buffers (due to corruption on either the send or receive sides) then this would translate into silent data loss rather than an explicit fail-fast error. This patch addresses this problem by adding a `buf.size != 0` check. See code comments for pointers to tests which guarantee the invariants relied on here. ## How was this patch tested? Existing tests (which required modifications, since some were creating empty buffers in mocks). I also added a test to make sure we fail on zero-size blocks. To test that the zero-size blocks are indeed a potential corruption source, I manually ran a workload in `spark-shell` with a modified build which replaces all buffers with zero-size buffers in the receive path. Author: Josh RosenCloses #21219 from JoshRosen/SPARK-24160. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2aa859b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2aa859b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2aa859b Branch: refs/heads/master Commit: d2aa859b4faeda03e32a7574dd0c5b4ed367fae4 Parents: 7564a9a Author: Josh Rosen Authored: Mon May 7 14:34:03 2018 +0800 Committer: Wenchen Fan Committed: Mon May 7 14:34:03 2018 +0800 -- .../storage/ShuffleBlockFetcherIterator.scala | 19 ++ .../ShuffleBlockFetcherIteratorSuite.scala | 71 ++-- 2 files changed, 70 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2aa859b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 6971efd..b318623 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -414,6 +414,25 @@ final class ShuffleBlockFetcherIterator( logDebug("Number of requests in flight " + reqsInFlight) } + if (buf.size == 0) { +// We will never legitimately receive a zero-size block. All blocks with zero records +// have zero size and all zero-size blocks have no records (and hence should never +// have been requested in the first place). This statement relies on behaviors of the +// shuffle writers, which are guaranteed by the following test cases: +// +// - BypassMergeSortShuffleWriterSuite: "write with some empty partitions" +// - UnsafeShuffleWriterSuite: "writeEmptyIterator" +// - DiskBlockObjectWriterSuite: "commit() and close() without ever opening or writing" +// +// There is not an explicit test for SortShuffleWriter but the underlying APIs that +// uses are shared by the UnsafeShuffleWriter (both writers use DiskBlockObjectWriter +// which returns a zero-size from commitAndGet() in case no records were written +// since the last call. +val msg = s"Received a zero-size buffer for block $blockId from $address " + + s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)" +throwFetchFailedException(blockId, address, new IOException(msg)) + } + val in = try {
spark git commit: [SPARK-23921][SQL] Add array_sort function
Repository: spark Updated Branches: refs/heads/master 889f6cc10 -> 7564a9a70 [SPARK-23921][SQL] Add array_sort function ## What changes were proposed in this pull request? The PR adds the SQL function `array_sort`. The behavior of the function is based on Presto's one. The function sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array. ## How was this patch tested? Added UTs Author: Kazuaki IshizakiCloses #21021 from kiszk/SPARK-23921. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7564a9a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7564a9a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7564a9a7 Branch: refs/heads/master Commit: 7564a9a70695dac2f0b5f51493d37cbc93691663 Parents: 889f6cc Author: Kazuaki Ishizaki Authored: Mon May 7 15:22:23 2018 +0900 Committer: Takuya UESHIN Committed: Mon May 7 15:22:23 2018 +0900 -- python/pyspark/sql/functions.py | 26 +- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/collectionOperations.scala | 240 +++ .../CollectionExpressionsSuite.scala| 34 ++- .../scala/org/apache/spark/sql/functions.scala | 12 + .../spark/sql/DataFrameFunctionsSuite.scala | 34 ++- 6 files changed, 292 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7564a9a7/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index ad4bd6f..bd55b5f 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2183,20 +2183,38 @@ def array_max(col): def sort_array(col, asc=True): """ Collection function: sorts the input array in ascending or descending order according -to the natural ordering of the array elements. +to the natural ordering of the array elements. Null elements will be placed at the beginning +of the returned array in ascending order or at the end of the returned array in descending +order. :param col: name of column or expression ->>> df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], ['data']) +>>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data']) >>> df.select(sort_array(df.data).alias('r')).collect() -[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])] +[Row(r=[None, 1, 2, 3]), Row(r=[1]), Row(r=[])] >>> df.select(sort_array(df.data, asc=False).alias('r')).collect() -[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])] +[Row(r=[3, 2, 1, None]), Row(r=[1]), Row(r=[])] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.sort_array(_to_java_column(col), asc)) +@since(2.4) +def array_sort(col): +""" +Collection function: sorts the input array in ascending order. The elements of the input array +must be orderable. Null elements will be placed at the end of the returned array. + +:param col: name of column or expression + +>>> df = spark.createDataFrame([([2, 1, None, 3],),([1],),([],)], ['data']) +>>> df.select(array_sort(df.data).alias('r')).collect() +[Row(r=[1, 2, 3, None]), Row(r=[1]), Row(r=[])] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.array_sort(_to_java_column(col))) + + @since(1.5) @ignore_unicode_prefix def reverse(col): http://git-wip-us.apache.org/repos/asf/spark/blob/7564a9a7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 51bb6b0..01776b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -403,6 +403,7 @@ object FunctionRegistry { expression[ArrayContains]("array_contains"), expression[ArrayJoin]("array_join"), expression[ArrayPosition]("array_position"), +expression[ArraySort]("array_sort"), expression[CreateMap]("map"), expression[CreateNamedStruct]("named_struct"), expression[ElementAt]("element_at"), http://git-wip-us.apache.org/repos/asf/spark/blob/7564a9a7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
spark git commit: [SPARK-24143] filter empty blocks when convert mapstatus to (blockId, size) pair
Repository: spark Updated Branches: refs/heads/master a634d66ce -> 889f6cc10 [SPARK-24143] filter empty blocks when convert mapstatus to (blockId, size) pair ## What changes were proposed in this pull request? In current code(`MapOutputTracker.convertMapStatuses`), mapstatus are converted to (blockId, size) pair for all blocks â no matter the block is empty or not, which result in OOM when there are lots of consecutive empty blocks, especially when adaptive execution is enabled. (blockId, size) pair is only used in `ShuffleBlockFetcherIterator` to control shuffle-read and only non-empty block request is sent. Can we just filter out the empty blocks in MapOutputTracker.convertMapStatuses and save memory? ## How was this patch tested? not added yet. Author: jinxingCloses #21212 from jinxing64/SPARK-24143. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/889f6cc1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/889f6cc1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/889f6cc1 Branch: refs/heads/master Commit: 889f6cc10cbd7781df04f468674a61f0ac5a870b Parents: a634d66 Author: jinxing Authored: Mon May 7 14:16:27 2018 +0800 Committer: Wenchen Fan Committed: Mon May 7 14:16:27 2018 +0800 -- .../org/apache/spark/MapOutputTracker.scala | 31 + .../storage/ShuffleBlockFetcherIterator.scala | 35 .../apache/spark/MapOutputTrackerSuite.scala| 31 - .../shuffle/BlockStoreShuffleReaderSuite.scala | 2 +- .../ShuffleBlockFetcherIteratorSuite.scala | 19 ++- 5 files changed, 80 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/889f6cc1/core/src/main/scala/org/apache/spark/MapOutputTracker.scala -- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 195fd4f..7364605 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolE import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.reflect.ClassTag @@ -282,7 +282,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { + : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) } @@ -296,7 +296,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] + : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] /** * Deletes map output status information for the specified shuffle stage. @@ -632,9 +632,10 @@ private[spark] class MapOutputTrackerMaster( } } + // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { + : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => @@ -642,7 +643,7 @@ private[spark] class MapOutputTrackerMaster( MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } case None => -Seq.empty +Iterator.empty } } @@ -669,8 +670,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr /** Remembers which map output locations are currently being fetched on an executor. */ private val fetching = new HashSet[Int] + // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int,