spark git commit: [SPARK-24951][SQL] Table valued functions should throw AnalysisException
Repository: spark Updated Branches: refs/heads/master 5f3441e54 -> 1f7e22c72 [SPARK-24951][SQL] Table valued functions should throw AnalysisException ## What changes were proposed in this pull request? Previously TVF resolution could throw IllegalArgumentException if the data type is null type. This patch replaces that exception with AnalysisException, enriched with positional information, to improve error message reporting and to be more consistent with rest of Spark SQL. ## How was this patch tested? Updated the test case in table-valued-functions.sql.out, which is how I identified this problem in the first place. Author: Reynold Xin Closes #21934 from rxin/SPARK-24951. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f7e22c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f7e22c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f7e22c7 Branch: refs/heads/master Commit: 1f7e22c72c89fc2c0e729dde0948bc6bdf8f7628 Parents: 5f3441e Author: Reynold Xin Authored: Tue Jul 31 22:25:40 2018 -0700 Committer: Xiao Li Committed: Tue Jul 31 22:25:40 2018 -0700 -- .../analysis/ResolveTableValuedFunctions.scala | 34 ++-- .../results/table-valued-functions.sql.out | 9 -- 2 files changed, 32 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f7e22c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala index 7358f9e..983e4b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Alias, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Range} import org.apache.spark.sql.catalyst.rules._ @@ -68,9 +69,11 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] { : (ArgumentList, Seq[Any] => LogicalPlan) = { (ArgumentList(args: _*), pf orElse { - case args => - throw new IllegalArgumentException( - "Invalid arguments for resolved function: " + args.mkString(", ")) + case arguments => + // This is caught again by the apply function and rethrow with richer information about + // position, etc, for a better error message. + throw new AnalysisException( + "Invalid arguments for resolved function: " + arguments.mkString(", ")) }) } @@ -105,22 +108,35 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => + // The whole resolution is somewhat difficult to understand here due to too much abstractions. + // We should probably rewrite the following at some point. Reynold was just here to improve + // error messages and didn't have time to do a proper rewrite. val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match { case Some(tvf) => + + def failAnalysis(): Nothing = { +val argTypes = u.functionArgs.map(_.dataType.typeName).mkString(", ") +u.failAnalysis( + s"""error: table-valued function ${u.functionName} with alternatives: + |${tvf.keys.map(_.toString).toSeq.sorted.map(x => s" ($x)").mkString("\n")} + |cannot be applied to: ($argTypes)""".stripMargin) + } + val resolved = tvf.flatMap { case (argList, resolver) => argList.implicitCast(u.functionArgs) match { case Some(casted) => -Some(resolver(casted.map(_.eval( +try { + Some(resolver(casted.map(_.eval( +} catch { + case e: AnalysisException => +failAnalysis() +} case _ => None } } resolved.headOption.getOrElse { -val argTypes = u.functionArgs.map(_.dataType.typeName).mkString(", ") -u.failAnalysis( - s"""error: table-valued function ${u.functionName} with
svn commit: r28482 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_31_20_01-5f3441e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 1 03:16:04 2018 New Revision: 28482 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_31_20_01-5f3441e docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24893][SQL] Remove the entire CaseWhen if all the outputs are semantic equivalence
Repository: spark Updated Branches: refs/heads/master f4772fd26 -> 5f3441e54 [SPARK-24893][SQL] Remove the entire CaseWhen if all the outputs are semantic equivalence ## What changes were proposed in this pull request? Similar to SPARK-24890, if all the outputs of `CaseWhen` are semantic equivalence, `CaseWhen` can be removed. ## How was this patch tested? Tests added. Author: DB Tsai Closes #21852 from dbtsai/short-circuit-when. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f3441e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f3441e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f3441e5 Branch: refs/heads/master Commit: 5f3441e542bfacd81d70bd8b34c22044c8928bff Parents: f4772fd Author: DB Tsai Authored: Wed Aug 1 10:31:02 2018 +0800 Committer: Wenchen Fan Committed: Wed Aug 1 10:31:02 2018 +0800 -- .../sql/catalyst/optimizer/expressions.scala| 18 .../optimizer/SimplifyConditionalSuite.scala| 48 +++- 2 files changed, 64 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f3441e5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 4696699..e7b4730 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -416,6 +416,24 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, Some(elseValue)) + if branches.forall(_._2.semanticEquals(elseValue)) => +// For non-deterministic conditions with side effect, we can not remove it, or change +// the ordering. As a result, we try to remove the deterministic conditions from the tail. +var hitNonDeterministicCond = false +var i = branches.length +while (i > 0 && !hitNonDeterministicCond) { + hitNonDeterministicCond = !branches(i - 1)._1.deterministic + if (!hitNonDeterministicCond) { +i -= 1 + } +} +if (i == 0) { + elseValue +} else { + e.copy(branches = branches.take(i).map(branch => (branch._1, elseValue))) +} } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5f3441e5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala index e210874..8ad7c12 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} @@ -46,7 +45,9 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper { private val unreachableBranch = (FalseLiteral, Literal(20)) private val nullBranch = (Literal.create(null, NullType), Literal(30)) - private val testRelation = LocalRelation('a.int) + val isNotNullCond = IsNotNull(UnresolvedAttribute(Seq("a"))) + val isNullCond = IsNull(UnresolvedAttribute("b")) + val notCond = Not(UnresolvedAttribute("c")) test("simplify if") { assertEquivalent( @@ -122,4 +123,47 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper { None), CaseWhen(normalBranch :: trueBranch :: Nil, None)) } + + test("simplify CaseWhen if all the outputs are semantic equivalence") { +// When the conditions in `CaseWhen` are all deterministic, `CaseWhen` can be removed. +assertEquivalent( + CaseWhen((isNotNullCond, Subtract(Literal(3), Literal(2))) :: +(isNullCond, Literal(1)) :: +(notCond, Add(Literal(6), Literal(-5))) :: +Nil,
svn commit: r28481 - in /dev/spark/2.3.3-SNAPSHOT-2018_07_31_18_01-5b187a8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 1 01:15:26 2018 New Revision: 28481 Log: Apache Spark 2.3.3-SNAPSHOT-2018_07_31_18_01-5b187a8 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0)
Repository: spark Updated Branches: refs/heads/branch-2.3 fc3df4517 -> 5b187a85a [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0) ## What changes were proposed in this pull request? See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using `from_pandas` to convert decimals fails if encounters a value of `None`: ```python import pyarrow as pa import pandas as pd from decimal import Decimal pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 2)) ``` **Arrow 0.8.0** ``` [ Decimal('3.14'), NA ] ``` **Arrow 0.9.0** ``` Traceback (most recent call last): File "", line 1, in File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` This PR propose to work around this via Decimal NaN: ```python pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), type=pa.decimal128(3, 2)) ``` ``` [ Decimal('3.14'), NA ] ``` ## How was this patch tested? Manually tested: ```bash SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests ``` **Before** ``` Traceback (most recent call last): File "/.../spark/python/pyspark/sql/tests.py", line 4672, in test_vectorized_udf_null_decimal self.assertEquals(df.collect(), res.collect()) File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect sock_info = self._jdf.collectToPython() File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o51.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/pyspark/worker.py", line 320, in main process() File "/.../spark/python/pyspark/worker.py", line 315, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream batch = _create_batch(series, self._timezone) File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch arrs = [create_array(s, t) for s, t in series] File "/.../spark/python/pyspark/serializers.py", line 241, in create_array return pa.Array.from_pandas(s, mask=mask, type=t) File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` **After** ``` Running tests... -- Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ...S. -- Ran 37 tests in 21.980s ``` Author: hyukjinkwon Closes #21928 from HyukjinKwon/SPARK-24976. (cherry picked from commit f4772fd26f32b11ae54e7721924b5cf6eb27298a) Signed-off-by: Bryan Cutler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b187a85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b187a85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b187a85 Branch: refs/heads/branch-2.3 Commit: 5b187a85a24c788e19742e03f1300662d475bab8 Parents: fc3df45 Author: hyukjinkwon Authored: Tue Jul 31 17:24:24 2018 -0700 Committer: Bryan Cutler Committed: Tue Jul 31 17:24:55 2018 -0700 -- python/pyspark/serializers.py | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b187a85/python/pyspark/serializers.py -- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 6d107f3..52a7afe 100644 --- a/python/pyspark/serializers.py
spark git commit: [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0)
Repository: spark Updated Branches: refs/heads/master 42dfe4f15 -> f4772fd26 [SPARK-24976][PYTHON] Allow None for Decimal type conversion (specific to PyArrow 0.9.0) ## What changes were proposed in this pull request? See [ARROW-2432](https://jira.apache.org/jira/browse/ARROW-2432). Seems using `from_pandas` to convert decimals fails if encounters a value of `None`: ```python import pyarrow as pa import pandas as pd from decimal import Decimal pa.Array.from_pandas(pd.Series([Decimal('3.14'), None]), type=pa.decimal128(3, 2)) ``` **Arrow 0.8.0** ``` [ Decimal('3.14'), NA ] ``` **Arrow 0.9.0** ``` Traceback (most recent call last): File "", line 1, in File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` This PR propose to work around this via Decimal NaN: ```python pa.Array.from_pandas(pd.Series([Decimal('3.14'), Decimal('NaN')]), type=pa.decimal128(3, 2)) ``` ``` [ Decimal('3.14'), NA ] ``` ## How was this patch tested? Manually tested: ```bash SPARK_TESTING=1 ./bin/pyspark pyspark.sql.tests ScalarPandasUDFTests ``` **Before** ``` Traceback (most recent call last): File "/.../spark/python/pyspark/sql/tests.py", line 4672, in test_vectorized_udf_null_decimal self.assertEquals(df.collect(), res.collect()) File "/.../spark/python/pyspark/sql/dataframe.py", line 533, in collect sock_info = self._jdf.collectToPython() File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o51.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 1 times, most recent failure: Lost task 3.0 in stage 1.0 (TID 7, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/.../spark/python/pyspark/worker.py", line 320, in main process() File "/.../spark/python/pyspark/worker.py", line 315, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/.../spark/python/pyspark/serializers.py", line 274, in dump_stream batch = _create_batch(series, self._timezone) File "/.../spark/python/pyspark/serializers.py", line 243, in _create_batch arrs = [create_array(s, t) for s, t in series] File "/.../spark/python/pyspark/serializers.py", line 241, in create_array return pa.Array.from_pandas(s, mask=mask, type=t) File "array.pxi", line 383, in pyarrow.lib.Array.from_pandas File "array.pxi", line 177, in pyarrow.lib.array File "error.pxi", line 77, in pyarrow.lib.check_status File "error.pxi", line 77, in pyarrow.lib.check_status ArrowInvalid: Error converting from Python objects to Decimal: Got Python object of type NoneType but can only handle these types: decimal.Decimal ``` **After** ``` Running tests... -- Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ...S. -- Ran 37 tests in 21.980s ``` Author: hyukjinkwon Closes #21928 from HyukjinKwon/SPARK-24976. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4772fd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4772fd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4772fd2 Branch: refs/heads/master Commit: f4772fd26f32b11ae54e7721924b5cf6eb27298a Parents: 42dfe4f Author: hyukjinkwon Authored: Tue Jul 31 17:24:24 2018 -0700 Committer: Bryan Cutler Committed: Tue Jul 31 17:24:24 2018 -0700 -- python/pyspark/serializers.py | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f4772fd2/python/pyspark/serializers.py -- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 4c16b5f..82abf19 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -216,9 +216,10 @@ def _create_batch(series, timezone): :param
svn commit: r28471 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_31_16_02-42dfe4f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jul 31 23:16:17 2018 New Revision: 28471 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_31_16_02-42dfe4f docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24973][PYTHON] Add numIter to Python ClusteringSummary
Repository: spark Updated Branches: refs/heads/master e82784d13 -> 42dfe4f15 [SPARK-24973][PYTHON] Add numIter to Python ClusteringSummary ## What changes were proposed in this pull request? Add numIter to Python version of ClusteringSummary ## How was this patch tested? Modified existing UT test_multiclass_logistic_regression_summary Author: Huaxin Gao Closes #21925 from huaxingao/spark-24973. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42dfe4f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42dfe4f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42dfe4f1 Branch: refs/heads/master Commit: 42dfe4f1593767eae355e27bf969339f4ab03f56 Parents: e82784d Author: Huaxin Gao Authored: Tue Jul 31 15:23:11 2018 -0500 Committer: Sean Owen Committed: Tue Jul 31 15:23:11 2018 -0500 -- python/pyspark/ml/clustering.py | 8 python/pyspark/ml/tests.py | 3 +++ 2 files changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42dfe4f1/python/pyspark/ml/clustering.py -- diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 8a58d83..ef9822d 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -88,6 +88,14 @@ class ClusteringSummary(JavaWrapper): """ return self._call_java("clusterSizes") +@property +@since("2.4.0") +def numIter(self): +""" +Number of iterations. +""" +return self._call_java("numIter") + class GaussianMixtureModel(JavaModel, JavaMLWritable, JavaMLReadable): """ http://git-wip-us.apache.org/repos/asf/spark/blob/42dfe4f1/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index bc78213..3d8883b 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1888,6 +1888,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertTrue(isinstance(s.cluster, DataFrame)) self.assertEqual(len(s.clusterSizes), 2) self.assertEqual(s.k, 2) +self.assertEqual(s.numIter, 3) def test_bisecting_kmeans_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), @@ -1903,6 +1904,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertTrue(isinstance(s.cluster, DataFrame)) self.assertEqual(len(s.clusterSizes), 2) self.assertEqual(s.k, 2) +self.assertEqual(s.numIter, 20) def test_kmeans_summary(self): data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), @@ -1918,6 +1920,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertTrue(isinstance(s.cluster, DataFrame)) self.assertEqual(len(s.clusterSizes), 2) self.assertEqual(s.k, 2) +self.assertEqual(s.numIter, 1) class KMeansTests(SparkSessionTestCase): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
Repository: spark Updated Branches: refs/heads/master 1223a201f -> e82784d13 [SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated. ## How was this patch tested? This PR uses existing Kafka related unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: tedyu Closes #21488 from tedyu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e82784d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e82784d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e82784d1 Branch: refs/heads/master Commit: e82784d13fac7d45164dfadb00d3fa43e64e0bde Parents: 1223a20 Author: tedyu Authored: Tue Jul 31 13:14:14 2018 -0700 Committer: zsxwing Committed: Tue Jul 31 13:14:14 2018 -0700 -- external/kafka-0-10-sql/pom.xml | 24 +++-- .../kafka010/KafkaContinuousSourceSuite.scala | 1 + .../kafka010/KafkaMicroBatchSourceSuite.scala | 7 +++- .../spark/sql/kafka010/KafkaTestUtils.scala | 36 +--- 4 files changed, 53 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/pom.xml -- diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 16bbc6d..9550003 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,10 +29,10 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 -0.10.0.1 +2.0.0 jar - Kafka 0.10 Source for Structured Streaming + Kafka 0.10+ Source for Structured Streaming http://spark.apache.org/ @@ -73,6 +73,20 @@ kafka_${scala.binary.version} ${kafka.version} test + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + net.sf.jopt-simple @@ -80,6 +94,12 @@ 3.2 test + +org.eclipse.jetty +jetty-servlet +${jetty.version} +test + org.scalacheck scalacheck_${scala.binary.version} http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index aab8ec4..ea2a2a8 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5d5e573..aa89868 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false")
svn commit: r28463 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_31_12_04-1223a20-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jul 31 19:18:53 2018 New Revision: 28463 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_31_12_04-1223a20 docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24609][ML][DOC] PySpark/SparkR doc doesn't explain RandomForestClassifier.featureSubsetStrategy well
Repository: spark Updated Branches: refs/heads/master 4ac2126bc -> 1223a201f [SPARK-24609][ML][DOC] PySpark/SparkR doc doesn't explain RandomForestClassifier.featureSubsetStrategy well ## What changes were proposed in this pull request? update doc of RandomForestClassifier.featureSubsetStrategy ## How was this patch tested? local built doc rdoc: ![default](https://user-images.githubusercontent.com/7322292/42807787-4dda6362-89e4-11e8-839f-a8519b7c1f1c.png) pydoc: ![default](https://user-images.githubusercontent.com/7322292/43112817-5f1d4d88-8f2a-11e8-93ff-de90db8afdca.png) Author: zhengruifeng Closes #21788 from zhengruifeng/rf_doc_py_r. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1223a201 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1223a201 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1223a201 Branch: refs/heads/master Commit: 1223a201fcb2c2f211ad96997ebb00c3554aa822 Parents: 4ac2126 Author: zhengruifeng Authored: Tue Jul 31 13:37:13 2018 -0500 Committer: Sean Owen Committed: Tue Jul 31 13:37:13 2018 -0500 -- R/pkg/R/mllib_tree.R| 13 - python/pyspark/ml/regression.py | 9 +++-- 2 files changed, 19 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1223a201/R/pkg/R/mllib_tree.R -- diff --git a/R/pkg/R/mllib_tree.R b/R/pkg/R/mllib_tree.R index 6769be0..0e60842 100644 --- a/R/pkg/R/mllib_tree.R +++ b/R/pkg/R/mllib_tree.R @@ -362,7 +362,18 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara #' For regression, must be "variance". For classification, must be one of #' "entropy" and "gini", default is "gini". #' @param featureSubsetStrategy The number of features to consider for splits at each tree node. -#'Supported options: "auto", "all", "onethird", "sqrt", "log2", (0.0-1.0], [1-n]. +#' Supported options: "auto" (choose automatically for task: If +#' numTrees == 1, set to "all." If numTrees > 1 +#' (forest), set to "sqrt" for classification and +#' to "onethird" for regression), +#' "all" (use all features), +#' "onethird" (use 1/3 of the features), +#' "sqrt" (use sqrt(number of features)), +#' "log2" (use log2(number of features)), +#' "n": (when n is in the range (0, 1.0], use +#' n * number of features. When n is in the range +#' (1, number of features), use n features). +#' Default is "auto". #' @param seed integer seed for random number generation. #' @param subsamplingRate Fraction of the training data used for learning each decision tree, in #'range (0, 1]. http://git-wip-us.apache.org/repos/asf/spark/blob/1223a201/python/pyspark/ml/regression.py -- diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 83f0edb..564c9f1 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -608,8 +608,13 @@ class TreeEnsembleParams(DecisionTreeParams): featureSubsetStrategy = \ Param(Params._dummy(), "featureSubsetStrategy", "The number of features to consider for splits at each tree node. Supported " + - "options: " + ", ".join(supportedFeatureSubsetStrategies) + ", (0.0-1.0], [1-n].", - typeConverter=TypeConverters.toString) + "options: 'auto' (choose automatically for task: If numTrees == 1, set to " + + "'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to " + + "'onethird' for regression), 'all' (use all features), 'onethird' (use " + + "1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use " + + "log2(number of features)), 'n' (when n is in the range (0, 1.0], use " + + "n * number of features. When n is in the range (1, number of features), use" + + " n features). default = 'auto'", typeConverter=TypeConverters.toString) def __init__(self): super(TreeEnsembleParams, self).__init__()
svn commit: r28458 - in /dev/spark/2.3.3-SNAPSHOT-2018_07_31_10_04-fc3df45-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jul 31 17:18:50 2018 New Revision: 28458 Log: Apache Spark 2.3.3-SNAPSHOT-2018_07_31_10_04-fc3df45 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24536] Validate that an evaluated limit clause cannot be null
Repository: spark Updated Branches: refs/heads/branch-2.3 25ea27b09 -> fc3df4517 [SPARK-24536] Validate that an evaluated limit clause cannot be null It proposes a version in which nullable expressions are not valid in the limit clause It was tested with unit and e2e tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Mauro Palsgraaf Closes #21807 from mauropalsgraaf/SPARK-24536. (cherry picked from commit 4ac2126bc64bad1b4cbe1c697b4bcafacd67c96c) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc3df451 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc3df451 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc3df451 Branch: refs/heads/branch-2.3 Commit: fc3df45177d176cc0fe43049b6f8df372f7ea0e0 Parents: 25ea27b Author: Mauro Palsgraaf Authored: Tue Jul 31 08:18:08 2018 -0700 Committer: Xiao Li Committed: Tue Jul 31 08:22:25 2018 -0700 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 14 +++--- .../catalyst/analysis/AnalysisErrorSuite.scala | 6 +++ .../test/resources/sql-tests/inputs/limit.sql | 5 +++ .../resources/sql-tests/results/limit.sql.out | 45 ++-- 4 files changed, 51 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc3df451/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 0d189b4..beb11d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -66,11 +66,15 @@ trait CheckAnalysis extends PredicateHelper { limitExpr.sql) case e if e.dataType != IntegerType => failAnalysis( s"The limit expression must be integer type, but got " + - e.dataType.simpleString) - case e if e.eval().asInstanceOf[Int] < 0 => failAnalysis( -"The limit expression must be equal to or greater than 0, but got " + - e.eval().asInstanceOf[Int]) - case e => // OK + e.dataType.catalogString) + case e => +e.eval() match { + case null => failAnalysis( +s"The evaluated limit expression must not be null, but got ${limitExpr.sql}") + case v: Int if v < 0 => failAnalysis( +s"The limit expression must be equal to or greater than 0, but got $v") + case _ => // OK +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/fc3df451/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 5d2f8e7..70325b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -393,6 +393,12 @@ class AnalysisErrorSuite extends AnalysisTest { ) errorTest( +"an evaluated limit class must not be null", +testRelation.limit(Literal(null, IntegerType)), +"The evaluated limit expression must not be null, but got " :: Nil + ) + + errorTest( "num_rows in limit clause must be equal to or greater than 0", listRelation.limit(-1), "The limit expression must be equal to or greater than 0, but got -1" :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/fc3df451/sql/core/src/test/resources/sql-tests/inputs/limit.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/limit.sql index f21912a..b4c73cf 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/limit.sql @@ -13,6 +13,11 @@ SELECT * FROM testdata LIMIT CAST(1 AS int); SELECT * FROM testdata LIMIT -1; SELECT * FROM testData TABLESAMPLE (-1 ROWS); + +SELECT * FROM testdata LIMIT CAST(1 AS INT); +-- evaluated limit must not be null +SELECT * FROM testdata LIMIT CAST(NULL AS INT); + -- limit must be foldable SELECT * FROM testdata LIMIT key > 3;
spark git commit: [SPARK-24536] Validate that an evaluated limit clause cannot be null
Repository: spark Updated Branches: refs/heads/master b4fd75fb9 -> 4ac2126bc [SPARK-24536] Validate that an evaluated limit clause cannot be null ## What changes were proposed in this pull request? It proposes a version in which nullable expressions are not valid in the limit clause ## How was this patch tested? It was tested with unit and e2e tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Mauro Palsgraaf Closes #21807 from mauropalsgraaf/SPARK-24536. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ac2126b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ac2126b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ac2126b Branch: refs/heads/master Commit: 4ac2126bc64bad1b4cbe1c697b4bcafacd67c96c Parents: b4fd75f Author: Mauro Palsgraaf Authored: Tue Jul 31 08:18:08 2018 -0700 Committer: Xiao Li Committed: Tue Jul 31 08:18:08 2018 -0700 -- .../sql/catalyst/analysis/CheckAnalysis.scala | 12 -- .../catalyst/analysis/AnalysisErrorSuite.scala | 6 +++ .../test/resources/sql-tests/inputs/limit.sql | 5 +++ .../resources/sql-tests/results/limit.sql.out | 45 ++-- 4 files changed, 50 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4ac2126b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f9478a1..4addc83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -68,10 +68,14 @@ trait CheckAnalysis extends PredicateHelper { case e if e.dataType != IntegerType => failAnalysis( s"The limit expression must be integer type, but got " + e.dataType.catalogString) - case e if e.eval().asInstanceOf[Int] < 0 => failAnalysis( -"The limit expression must be equal to or greater than 0, but got " + - e.eval().asInstanceOf[Int]) - case e => // OK + case e => +e.eval() match { + case null => failAnalysis( +s"The evaluated limit expression must not be null, but got ${limitExpr.sql}") + case v: Int if v < 0 => failAnalysis( +s"The limit expression must be equal to or greater than 0, but got $v") + case _ => // OK +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/4ac2126b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index f4cfed4..ae8d77b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -400,6 +400,12 @@ class AnalysisErrorSuite extends AnalysisTest { ) errorTest( +"an evaluated limit class must not be null", +testRelation.limit(Literal(null, IntegerType)), +"The evaluated limit expression must not be null, but got " :: Nil + ) + + errorTest( "num_rows in limit clause must be equal to or greater than 0", listRelation.limit(-1), "The limit expression must be equal to or greater than 0, but got -1" :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/4ac2126b/sql/core/src/test/resources/sql-tests/inputs/limit.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/limit.sql index f21912a..b4c73cf 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/limit.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/limit.sql @@ -13,6 +13,11 @@ SELECT * FROM testdata LIMIT CAST(1 AS int); SELECT * FROM testdata LIMIT -1; SELECT * FROM testData TABLESAMPLE (-1 ROWS); + +SELECT * FROM testdata LIMIT CAST(1 AS INT); +-- evaluated limit must not be null +SELECT * FROM testdata LIMIT CAST(NULL AS INT); + -- limit must be foldable SELECT * FROM testdata LIMIT key > 3; http://git-wip-us.apache.org/repos/asf/spark/blob/4ac2126b/sql/core/src/test/resources/sql-tests/results/limit.sql.out
svn commit: r28446 - in /dev/spark/2.4.0-SNAPSHOT-2018_07_31_00_02-b4fd75f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jul 31 07:16:33 2018 New Revision: 28446 Log: Apache Spark 2.4.0-SNAPSHOT-2018_07_31_00_02-b4fd75f docs [This commit notification would consist of 1470 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24972][SQL] PivotFirst could not handle pivot columns of complex types
Repository: spark Updated Branches: refs/heads/master 8141d5592 -> b4fd75fb9 [SPARK-24972][SQL] PivotFirst could not handle pivot columns of complex types ## What changes were proposed in this pull request? When the pivot column is of a complex type, the eval() result will be an UnsafeRow, while the keys of the HashMap for column value matching is a GenericInternalRow. As a result, there will be no match and the result will always be empty. So for a pivot column of complex-types, we should: 1) If the complex-type is not comparable (orderable), throw an Exception. It cannot be a pivot column. 2) Otherwise, if it goes through the `PivotFirst` code path, `PivotFirst` should use a TreeMap instead of HashMap for such columns. This PR has also reverted the walk-around in Analyzer that had been introduced to avoid this `PivotFirst` issue. ## How was this patch tested? Added UT. Author: maryannxue Closes #21926 from maryannxue/pivot_followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4fd75fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4fd75fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4fd75fb Branch: refs/heads/master Commit: b4fd75fb9b615cfe592ad269cf20d02b483a0d33 Parents: 8141d55 Author: maryannxue Authored: Mon Jul 30 23:43:53 2018 -0700 Committer: Xiao Li Committed: Mon Jul 30 23:43:53 2018 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 12 +- .../expressions/aggregate/PivotFirst.scala | 11 +- .../test/resources/sql-tests/inputs/pivot.sql | 78 - .../resources/sql-tests/results/pivot.sql.out | 116 +-- 4 files changed, 199 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b4fd75fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1488ede..76dc867 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -529,6 +529,10 @@ class Analyzer( || (p.groupByExprsOpt.isDefined && !p.groupByExprsOpt.get.forall(_.resolved)) || !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => p case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, child) => +if (!RowOrdering.isOrderable(pivotColumn.dataType)) { + throw new AnalysisException( +s"Invalid pivot column '${pivotColumn}'. Pivot columns must be comparable.") +} // Check all aggregate expressions. aggregates.foreach(checkValidAggregateExpression) // Check all pivot values are literal and match pivot column data type. @@ -574,10 +578,14 @@ class Analyzer( // Since evaluating |pivotValues| if statements for each input row can get slow this is an // alternate plan that instead uses two steps of aggregation. val namedAggExps: Seq[NamedExpression] = aggregates.map(a => Alias(a, a.sql)()) - val bigGroup = groupByExprs ++ pivotColumn.references + val namedPivotCol = pivotColumn match { +case n: NamedExpression => n +case _ => Alias(pivotColumn, "__pivot_col")() + } + val bigGroup = groupByExprs :+ namedPivotCol val firstAgg = Aggregate(bigGroup, bigGroup ++ namedAggExps, child) val pivotAggs = namedAggExps.map { a => -Alias(PivotFirst(pivotColumn, a.toAttribute, evalPivotValues) +Alias(PivotFirst(namedPivotCol.toAttribute, a.toAttribute, evalPivotValues) .toAggregateExpression() , "__pivot_" + a.sql)() } http://git-wip-us.apache.org/repos/asf/spark/blob/b4fd75fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala index 5237148..33bc5b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.catalyst.expressions.aggregate -import scala.collection.immutable.HashMap +import