[spark] branch branch-3.2 updated: [SPARK-34399][DOCS][FOLLOWUP] Add docs for the new metrics of task/job commit time
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new fa0c7f4 [SPARK-34399][DOCS][FOLLOWUP] Add docs for the new metrics of task/job commit time fa0c7f4 is described below commit fa0c7f487b67ec94f40b0fc914b6a8efff4eae8d Author: Gengliang Wang AuthorDate: Wed Jul 28 13:54:35 2021 +0800 [SPARK-34399][DOCS][FOLLOWUP] Add docs for the new metrics of task/job commit time ### What changes were proposed in this pull request? This is follow-up of https://github.com/apache/spark/pull/31522. It adds docs for the new metrics of task/job commit time ### Why are the changes needed? So that users can understand the metrics better and know that the new metrics are only for file table writes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Build docs and preview: ![image](https://user-images.githubusercontent.com/1097932/127198210-2ab201d3-5fca-4065-ace6-0b930390380f.png) Closes #33542 from gengliangwang/addDocForMetrics. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan (cherry picked from commit c9a7ff3f36838fad5b62fa5d9be020aa465e4193) Signed-off-by: Wenchen Fan --- docs/web-ui.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/web-ui.md b/docs/web-ui.md index deaf50f..ec86945 100644 --- a/docs/web-ui.md +++ b/docs/web-ui.md @@ -404,7 +404,8 @@ Here is the list of SQL metrics: avg hash probe bucket list iters the average bucket list iterations per lookup during aggregation HashAggregate data size of build side the size of built hash map ShuffledHashJoin time to build hash map the time spent on building hash map ShuffledHashJoin - + task commit time the time spent on committing the output of a task after the writes succeed any write operation on a file-based table + job commit time the time spent on committing the output of a job after the writes succeed any write operation on a file-based table ## Structured Streaming Tab - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (59e0c25 -> c9a7ff3)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 59e0c25 [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field add c9a7ff3 [SPARK-34399][DOCS][FOLLOWUP] Add docs for the new metrics of task/job commit time No new revisions were added by this update. Summary of changes: docs/web-ui.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3c44113 [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field 3c44113 is described below commit 3c441135bbf26cbffa8fe1310b01ef6afc3c21a7 Author: Angerszh AuthorDate: Wed Jul 28 13:52:27 2021 +0800 [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field ### What changes were proposed in this pull request? Last pr only support add inner field check for hive ddl, this pr add check for parquet data source write API. ### Why are the changes needed? Failed earlier ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added Ut Without this UI it failed as ``` [info] - SPARK-36312: ParquetWriteSupport should check inner field *** FAILED *** (8 seconds, 29 milliseconds) [info] Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown (HiveDDLSuite.scala:3035) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563) [info] at org.scalatest.Assertions.intercept(Assertions.scala:756) [info] at org.scalatest.Assertions.intercept$(Assertions.scala:746) [info] at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1563) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396(HiveDDLSuite.scala:3035) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396$adapted(HiveDDLSuite.scala:3034) [info] at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath(SQLHelper.scala:69) [info] at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath$(SQLHelper.scala:66) [info] at org.apache.spark.sql.QueryTest.withTempPath(QueryTest.scala:34) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$395(HiveDDLSuite.scala:3034) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withView(SQLTestUtils.scala:316) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withView$(SQLTestUtils.scala:314) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.withView(HiveDDLSuite.scala:396) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$394(HiveDDLSuite.scala:3032) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:431) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at
[spark] branch master updated: [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 59e0c25 [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field 59e0c25 is described below commit 59e0c25376e1b3d227a1dc9ed93a7593314eddb3 Author: Angerszh AuthorDate: Wed Jul 28 13:52:27 2021 +0800 [SPARK-36312][SQL] ParquetWriterSupport.setSchema should check inner field ### What changes were proposed in this pull request? Last pr only support add inner field check for hive ddl, this pr add check for parquet data source write API. ### Why are the changes needed? Failed earlier ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added Ut Without this UI it failed as ``` [info] - SPARK-36312: ParquetWriteSupport should check inner field *** FAILED *** (8 seconds, 29 milliseconds) [info] Expected exception org.apache.spark.sql.AnalysisException to be thrown, but org.apache.spark.SparkException was thrown (HiveDDLSuite.scala:3035) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1563) [info] at org.scalatest.Assertions.intercept(Assertions.scala:756) [info] at org.scalatest.Assertions.intercept$(Assertions.scala:746) [info] at org.scalatest.funsuite.AnyFunSuite.intercept(AnyFunSuite.scala:1563) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396(HiveDDLSuite.scala:3035) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$396$adapted(HiveDDLSuite.scala:3034) [info] at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath(SQLHelper.scala:69) [info] at org.apache.spark.sql.catalyst.plans.SQLHelper.withTempPath$(SQLHelper.scala:66) [info] at org.apache.spark.sql.QueryTest.withTempPath(QueryTest.scala:34) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$395(HiveDDLSuite.scala:3034) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1468) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withView(SQLTestUtils.scala:316) [info] at org.apache.spark.sql.test.SQLTestUtilsBase.withView$(SQLTestUtils.scala:314) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.withView(HiveDDLSuite.scala:396) [info] at org.apache.spark.sql.hive.execution.HiveDDLSuite.$anonfun$new$394(HiveDDLSuite.scala:3032) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:431) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at
[spark] branch branch-3.2 updated: [SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c59e54f [SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE c59e54f is described below commit c59e54fe0e35315504132340135676955c7e0d16 Author: Eugene Koifman AuthorDate: Wed Jul 28 13:49:48 2021 +0800 [SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE ### What changes were proposed in this pull request? AQEShuffleReadExec already reports "number of skewed partitions" and "number of skewed partition splits". It would be useful to also report "number of coalesced partitions" and for ShuffleExchange to report "number of partitions" This way it's clear what happened on the map side and on the reduce side. ![Metrics](https://user-images.githubusercontent.com/4297661/126729820-cf01b3fa-7bc4-44a5-8098-91689766a68a.png) ### Why are the changes needed? Improves usability ### Does this PR introduce _any_ user-facing change? Yes, it now provides more information about `AQEShuffleReadExec` operator behavior in the metrics system. ### How was this patch tested? Existing tests Closes #32776 from ekoifman/PRISM-91635-customshufflereader-sql-metrics. Authored-by: Eugene Koifman Signed-off-by: Wenchen Fan (cherry picked from commit 41a16ebf1196bec86aec104e72fd7fb1597c0073) Signed-off-by: Wenchen Fan --- .../execution/adaptive/AQEShuffleReadExec.scala| 30 +- .../execution/exchange/ShuffleExchangeExec.scala | 10 ++-- .../scala/org/apache/spark/sql/ExplainSuite.scala | 1 + .../adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala index d897507..0768b9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala @@ -90,15 +90,19 @@ case class AQEShuffleReadExec private( } /** + * Returns true iff some partitions were actually combined + */ + private def isCoalesced(spec: ShufflePartitionSpec) = spec match { +case CoalescedPartitionSpec(0, 0, _) => true +case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1 +case _ => false + } + + /** * Returns true iff some non-empty partitions were combined */ def hasCoalescedPartition: Boolean = { -partitionSpecs.exists { - // shuffle from empty RDD - case CoalescedPartitionSpec(0, 0, _) => true - case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1 - case _ => false -} +partitionSpecs.exists(isCoalesced) } def hasSkewedPartition: Boolean = @@ -153,6 +157,13 @@ case class AQEShuffleReadExec private( driverAccumUpdates += (skewedSplits.id -> numSplits) } +if (hasCoalescedPartition) { + val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions") + val x = partitionSpecs.count(isCoalesced) + numCoalescedPartitionsMetric.set(x) + driverAccumUpdates += numCoalescedPartitionsMetric.id -> x +} + partitionDataSizes.foreach { dataSizes => val partitionDataSizeMetrics = metrics("partitionDataSize") driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _) @@ -183,6 +194,13 @@ case class AQEShuffleReadExec private( } else { Map.empty } + } ++ { +if (hasCoalescedPartition) { + Map("numCoalescedPartitions" -> +SQLMetrics.createMetric(sparkContext, "number of coalesced partitions")) +} else { + Map.empty +} } } else { // It's a canonicalized plan, no need to report metrics. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 5a45af6..c033aed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -123,7 +123,8 @@ case class ShuffleExchangeExec( private[sql] lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics = Map( -"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size") +"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data
[spark] branch master updated (23a6ffa -> 41a16eb)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 23a6ffa [SPARK-36275][SQL] ResolveAggregateFunctions should works with nested fields add 41a16eb [SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead in AQE No new revisions were added by this update. Summary of changes: .../execution/adaptive/AQEShuffleReadExec.scala| 30 +- .../execution/exchange/ShuffleExchangeExec.scala | 10 ++-- .../scala/org/apache/spark/sql/ExplainSuite.scala | 1 + .../adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 4 files changed, 35 insertions(+), 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c8dd97d -> 23a6ffa)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c8dd97d [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up add 23a6ffa [SPARK-36275][SQL] ResolveAggregateFunctions should works with nested fields No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 26 ++ 2 files changed, 30 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36275][SQL] ResolveAggregateFunctions should works with nested fields
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 993ffaf [SPARK-36275][SQL] ResolveAggregateFunctions should works with nested fields 993ffaf is described below commit 993ffafc3e2e7f5b2ce7057a9fec9b061153c462 Author: allisonwang-db AuthorDate: Wed Jul 28 13:35:17 2021 +0800 [SPARK-36275][SQL] ResolveAggregateFunctions should works with nested fields ### What changes were proposed in this pull request? This PR fixes an issue in `ResolveAggregateFunctions` where non-aggregated nested fields in ORDER BY and HAVING are not resolved correctly. This is because nested fields are resolved as aliases that fail to be semantically equal to any grouping/aggregate expressions. ### Why are the changes needed? To fix an analyzer issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Closes #33498 from allisonwang-db/spark-36275-resolve-agg-func. Authored-by: allisonwang-db Signed-off-by: Wenchen Fan (cherry picked from commit 23a6ffa5dc6d2330ea1c3e2b0890328e7d2d0f5d) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 26 ++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ee7b342..6e571e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2553,8 +2553,10 @@ class Analyzer(override val catalogManager: CatalogManager) // a table `t` has two columns `c1` and `c2`, for query `SELECT ... FROM t // GROUP BY c1 HAVING c2 = 0`, even though we can resolve column `c2` here, we // should undo it later and fail with "Column c2 not found". - agg.child.resolve(u.nameParts, resolver).map(TempResolvedColumn(_, u.nameParts)) -.getOrElse(u) + agg.child.resolve(u.nameParts, resolver).map({ +case a: Alias => TempResolvedColumn(a.child, u.nameParts) +case o => TempResolvedColumn(o, u.nameParts) + }).getOrElse(u) } catch { case _: AnalysisException => u } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 5cef243..6ddc6b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1115,4 +1115,30 @@ class AnalysisSuite extends AnalysisTest with Matchers { Seq("grouping_id() can only be used with GroupingSets/Cube/Rollup"), false) } + + test("SPARK-36275: Resolve aggregate functions should work with nested fields") { +assertAnalysisSuccess(parsePlan( + """ +|SELECT c.x, SUM(c.y) +|FROM VALUES NAMED_STRUCT('x', 'A', 'y', 1), NAMED_STRUCT('x', 'A', 'y', 2) AS t(c) +|GROUP BY c.x +|HAVING c.x > 1 +|""".stripMargin)) + +assertAnalysisSuccess(parsePlan( + """ +|SELECT c.x, SUM(c.y) +|FROM VALUES NAMED_STRUCT('x', 'A', 'y', 1), NAMED_STRUCT('x', 'A', 'y', 2) AS t(c) +|GROUP BY c.x +|ORDER BY c.x +|""".stripMargin)) + +assertAnalysisError(parsePlan( + """ +|SELECT c.x +|FROM VALUES NAMED_STRUCT('x', 'A', 'y', 1), NAMED_STRUCT('x', 'A', 'y', 2) AS t(c) +|GROUP BY c.x +|ORDER BY c.x + c.y +|""".stripMargin), "cannot resolve 'c.y' given input columns: [x]" :: Nil) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36028][SQL][3.2] Allow Project to host outer references in scalar subqueries
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new aea36aa [SPARK-36028][SQL][3.2] Allow Project to host outer references in scalar subqueries aea36aa is described below commit aea36aa977e32a766cc71b39d41cb884905ddd3b Author: allisonwang-db AuthorDate: Wed Jul 28 12:54:15 2021 +0800 [SPARK-36028][SQL][3.2] Allow Project to host outer references in scalar subqueries This PR cherry picks https://github.com/apache/spark/pull/33235 to branch-3.2 to fix test failures introduced by https://github.com/apache/spark/pull/33284. ### What changes were proposed in this pull request? This PR allows the `Project` node to host outer references in scalar subqueries when `decorrelateInnerQuery` is enabled. It is already supported by the new decorrelation framework and the `RewriteCorrelatedScalarSubquery` rule. Note currently by default all correlated subqueries will be decorrelated, which is not necessarily the most optimal approach. Consider `SELECT (SELECT c1) FROM t`. This should be optimized as `SELECT c1 FROM t` instead of rewriting it as a left outer join. This will be done in a separate PR to optimize correlated scalar/lateral subqueries with OneRowRelation. ### Why are the changes needed? To allow more types of correlated scalar subqueries. ### Does this PR introduce _any_ user-facing change? Yes. This PR allows outer query column references in the SELECT cluase of a correlated scalar subquery. For example: ```sql SELECT (SELECT c1) FROM t; ``` Before this change: ``` org.apache.spark.sql.AnalysisException: Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses ``` After this change: ``` +--+ |scalarsubquery(c1)| +--+ |0 | |1 | +--+ ``` ### How was this patch tested? Added unit tests and SQL tests. (cherry picked from commit ca348e50a4edbd857ec86e4e9693fa4bcbab54b7) Signed-off-by: allisonwang-db Closes #33527 from allisonwang-db/spark-36028-3.2. Authored-by: allisonwang-db Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/CheckAnalysis.scala | 23 +++-- .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 7 -- .../catalyst/analysis/ResolveSubquerySuite.scala | 26 +- .../scalar-subquery/scalar-subquery-select.sql | 9 +- .../scalar-subquery/scalar-subquery-select.sql.out | 97 +- 5 files changed, 144 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e439085..c157848 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -725,9 +725,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { s"Filter/Aggregate/Project and a few commands: $plan") } } +// Validate to make sure the correlations appearing in the query are valid and +// allowed by spark. +checkCorrelationsInSubquery(expr.plan, isScalarOrLateral = true) case _: LateralSubquery => assert(plan.isInstanceOf[LateralJoin]) +// Validate to make sure the correlations appearing in the query are valid and +// allowed by spark. +checkCorrelationsInSubquery(expr.plan, isScalarOrLateral = true) case inSubqueryOrExistsSubquery => plan match { @@ -736,11 +742,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" + s" Filter/Join and a few commands: $plan") } +// Validate to make sure the correlations appearing in the query are valid and +// allowed by spark. +checkCorrelationsInSubquery(expr.plan) } - -// Validate to make sure the correlations appearing in the query are valid and -// allowed by spark. -checkCorrelationsInSubquery(expr.plan, isLateral = plan.isInstanceOf[LateralJoin]) } /** @@ -779,7 +784,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { * Validates to make sure the outer references appearing inside the subquery * are allowed. */ - private def checkCorrelationsInSubquery(sub: LogicalPlan, isLateral: Boolean = false): Unit = { + private def checkCorrelationsInSubquery( + sub: LogicalPlan, + isScalarOrLateral: Boolean =
[spark] branch branch-3.2 updated: [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 33ef52e [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up 33ef52e is described below commit 33ef52e2c0856c0188d868a6cfb5f38b3d922f2f Author: Huaxin Gao AuthorDate: Wed Jul 28 12:52:42 2021 +0800 [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up ### What changes were proposed in this pull request? update java doc, JDBC data source doc, address follow up comments ### Why are the changes needed? update doc and address follow up comments ### Does this PR introduce _any_ user-facing change? Yes, add the new JDBC option `pushDownAggregate` in JDBC data source doc. ### How was this patch tested? manually checked Closes #33526 from huaxingao/aggPD_followup. Authored-by: Huaxin Gao Signed-off-by: Wenchen Fan (cherry picked from commit c8dd97d4566e4cd6865437c2640467c9c16080d4) Signed-off-by: Wenchen Fan --- docs/sql-data-sources-jdbc.md | 9 + .../sql/connector/expressions/Aggregation.java | 12 +++ .../spark/sql/connector/expressions/Count.java | 28 +++ .../spark/sql/connector/expressions/CountStar.java | 14 .../spark/sql/connector/expressions/Max.java | 18 -- .../spark/sql/connector/expressions/Min.java | 20 --- .../spark/sql/connector/expressions/Sum.java | 40 +- .../connector/read/SupportsPushDownAggregates.java | 8 ++--- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 8 ++--- .../execution/datasources/v2/PushDownUtils.scala | 10 ++ .../datasources/v2/V2ScanRelationPushDown.scala| 4 +-- .../datasources/v2/jdbc/JDBCScanBuilder.scala | 4 +-- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 2 +- 13 files changed, 78 insertions(+), 99 deletions(-) diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md index c973e8a..315f476 100644 --- a/docs/sql-data-sources-jdbc.md +++ b/docs/sql-data-sources-jdbc.md @@ -238,6 +238,15 @@ logging into the data sources. +pushDownAggregate +false + + The option to enable or disable aggregate push-down into the JDBC data source. The default value is false, in which case Spark will not push down aggregates to the JDBC data source. Otherwise, if sets to true, aggregates will be pushed down to the JDBC data source. Aggregate push-down is usually turned off when the aggregate is performed faster by Spark than by the JDBC data source. Please note that aggregates can be pushed down if and only if all the aggregate functions and the rel [...] + +read + + + keytab (none) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java index fdf3031..8eb3491 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Aggregation.java @@ -28,19 +28,15 @@ import java.io.Serializable; */ @Evolving public final class Aggregation implements Serializable { - private AggregateFunc[] aggregateExpressions; - private FieldReference[] groupByColumns; + private final AggregateFunc[] aggregateExpressions; + private final FieldReference[] groupByColumns; public Aggregation(AggregateFunc[] aggregateExpressions, FieldReference[] groupByColumns) { this.aggregateExpressions = aggregateExpressions; this.groupByColumns = groupByColumns; } - public AggregateFunc[] aggregateExpressions() { -return aggregateExpressions; - } + public AggregateFunc[] aggregateExpressions() { return aggregateExpressions; } - public FieldReference[] groupByColumns() { -return groupByColumns; - } + public FieldReference[] groupByColumns() { return groupByColumns; } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java index 17562a1..0e28a93 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Count.java @@ -26,24 +26,20 @@ import org.apache.spark.annotation.Evolving; */ @Evolving public final class Count implements AggregateFunc { -private FieldReference column; -private boolean isDistinct; + private final FieldReference column; + private final boolean isDistinct; -public Count(FieldReference column, boolean isDistinct) { -this.column = column; -this.isDistinct =
[spark] branch master updated (1614d00 -> c8dd97d)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1614d00 [SPARK-36318][SQL][DOCS] Update docs about mapping of ANSI interval types to Java/Scala/SQL types add c8dd97d [SPARK-34952][SQL][FOLLOW-UP] DSv2 aggregate push down follow-up No new revisions were added by this update. Summary of changes: docs/sql-data-sources-jdbc.md | 9 + .../sql/connector/expressions/Aggregation.java | 12 +++ .../spark/sql/connector/expressions/Count.java | 28 +++ .../spark/sql/connector/expressions/CountStar.java | 14 .../spark/sql/connector/expressions/Max.java | 18 -- .../spark/sql/connector/expressions/Min.java | 20 --- .../spark/sql/connector/expressions/Sum.java | 40 +- .../connector/read/SupportsPushDownAggregates.java | 8 ++--- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 8 ++--- .../execution/datasources/v2/PushDownUtils.scala | 10 ++ .../datasources/v2/V2ScanRelationPushDown.scala| 4 +-- .../datasources/v2/jdbc/JDBCScanBuilder.scala | 4 +-- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 2 +- 13 files changed, 78 insertions(+), 99 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36318][SQL][DOCS] Update docs about mapping of ANSI interval types to Java/Scala/SQL types
This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 56f1ee4 [SPARK-36318][SQL][DOCS] Update docs about mapping of ANSI interval types to Java/Scala/SQL types 56f1ee4 is described below commit 56f1ee4b066ca5123c207acd91f27ee80a3bb07b Author: Max Gekk AuthorDate: Wed Jul 28 13:42:35 2021 +0900 [SPARK-36318][SQL][DOCS] Update docs about mapping of ANSI interval types to Java/Scala/SQL types ### What changes were proposed in this pull request? 1. Update the tables at https://spark.apache.org/docs/latest/sql-ref-datatypes.html about mapping ANSI interval types to Java/Scala/SQL types. 2. Remove `CalendarIntervalType` from the table of mapping Catalyst types to SQL types. https://user-images.githubusercontent.com/1580697/127204790-7ccb9c64-daf2-427d-963e-b7367aaa3439.png;> https://user-images.githubusercontent.com/1580697/127204806-a0a51950-3c2d-4198-8a22-0f6614bb1487.png;> ### Why are the changes needed? To inform users which types from language APIs should be used as ANSI interval types. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checking by building the docs: ``` $ SKIP_RDOC=1 SKIP_API=1 SKIP_PYTHONDOC=1 bundle exec jekyll build ``` Closes #33543 from MaxGekk/doc-interval-type-lang-api. Authored-by: Max Gekk Signed-off-by: Kousuke Saruta (cherry picked from commit 1614d004174c1aeda0c1511d3cba92cf55fc14b0) Signed-off-by: Kousuke Saruta --- docs/sql-ref-datatypes.md | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/sql-ref-datatypes.md b/docs/sql-ref-datatypes.md index 89ffa34..d699bfe 100644 --- a/docs/sql-ref-datatypes.md +++ b/docs/sql-ref-datatypes.md @@ -125,6 +125,8 @@ You can access them by doing |**BooleanType**|Boolean|BooleanType| |**TimestampType**|java.sql.Timestamp|TimestampType| |**DateType**|java.sql.Date|DateType| +|**YearMonthIntervalType**|java.time.Period|YearMonthIntervalType| +|**DayTimeIntervalType**|java.time.Duration|DayTimeIntervalType| |**ArrayType**|scala.collection.Seq|ArrayType(*elementType*, [*containsNull]*)**Note:** The default value of *containsNull* is true.| |**MapType**|scala.collection.Map|MapType(*keyType*, *valueType*, [*valueContainsNull]*)**Note:** The default value of *valueContainsNull* is true.| |**StructType**|org.apache.spark.sql.Row|StructType(*fields*)**Note:** *fields* is a Seq of StructFields. Also, two fields with the same name are not allowed.| @@ -153,6 +155,8 @@ please use factory methods provided in |**BooleanType**|boolean or Boolean|DataTypes.BooleanType| |**TimestampType**|java.sql.Timestamp|DataTypes.TimestampType| |**DateType**|java.sql.Date|DataTypes.DateType| +|**YearMonthIntervalType**|java.time.Period|YearMonthIntervalType| +|**DayTimeIntervalType**|java.time.Duration|DayTimeIntervalType| |**ArrayType**|java.util.List|DataTypes.createArrayType(*elementType*)**Note:** The value of *containsNull* will be true.DataTypes.createArrayType(*elementType*, *containsNull*).| |**MapType**|java.util.Map|DataTypes.createMapType(*keyType*, *valueType*)**Note:** The value of *valueContainsNull* will be true.DataTypes.createMapType(*keyType*, *valueType*, *valueContainsNull*)| |**StructType**|org.apache.spark.sql.Row|DataTypes.createStructType(*fields*)**Note:** *fields* is a List or an array of StructFields.Also, two fields with the same name are not allowed.| @@ -230,7 +234,8 @@ The following table shows the type names as well as aliases used in Spark SQL pa |**StringType**|STRING| |**BinaryType**|BINARY| |**DecimalType**|DECIMAL, DEC, NUMERIC| -|**CalendarIntervalType**|INTERVAL| +|**YearMonthIntervalType**|INTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTH| +|**DayTimeIntervalType**|INTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECOND, INTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECOND, INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECOND| |**ArrayType**|ARRAY\| |**StructType**|STRUCT **Note:** ':' is optional.| |**MapType**|MAP| - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1fafa8e -> 1614d00)
This is an automated email from the ASF dual-hosted git repository. sarutak pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1fafa8e [SPARK-36314][SS] Update Sessionization examples to use native support of session window add 1614d00 [SPARK-36318][SQL][DOCS] Update docs about mapping of ANSI interval types to Java/Scala/SQL types No new revisions were added by this update. Summary of changes: docs/sql-ref-datatypes.md | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36314][SS] Update Sessionization examples to use native support of session window
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 16c60099 [SPARK-36314][SS] Update Sessionization examples to use native support of session window 16c60099 is described below commit 16c60099577cbfa7e2e0a69badb5baf6d9164f93 Author: Jungtaek Lim AuthorDate: Tue Jul 27 20:10:02 2021 -0700 [SPARK-36314][SS] Update Sessionization examples to use native support of session window ### What changes were proposed in this pull request? This PR proposes to update Sessionization examples to use native support of session window. It also adds the example for PySpark as native support of session window is available to PySpark as well. ### Why are the changes needed? We should guide the simplest way to achieve the same workload. I'll provide another example for cases we can't do with native support of session window. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested. Closes #33548 from HeartSaVioR/SPARK-36314. Authored-by: Jungtaek Lim Signed-off-by: Liang-Chi Hsieh (cherry picked from commit 1fafa8e191430d7a8d6a6eb5aa19056108f310c9) Signed-off-by: Liang-Chi Hsieh --- .../streaming/JavaStructuredSessionization.java| 193 ++--- .../sql/streaming/structured_sessionization.py | 87 ++ .../sql/streaming/StructuredSessionization.scala | 84 ++--- 3 files changed, 112 insertions(+), 252 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java index 34ee235..eb7ce11 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -16,22 +16,15 @@ */ package org.apache.spark.examples.sql.streaming; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsWithStateFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.streaming.GroupState; -import org.apache.spark.sql.streaming.GroupStateTimeout; import org.apache.spark.sql.streaming.StreamingQuery; -import java.io.Serializable; -import java.sql.Timestamp; -import java.util.*; +import static org.apache.spark.sql.functions.*; /** * Counts words in UTF8 encoded, '\n' delimited text received from the network. * - * Usage: JavaStructuredNetworkWordCount + * Usage: JavaStructuredSessionization * and describe the TCP server that Structured Streaming * would connect to receive data. * @@ -66,86 +59,20 @@ public final class JavaStructuredSessionization { .option("includeTimestamp", true) .load(); -FlatMapFunction linesToEvents = - new FlatMapFunction() { -@Override -public Iterator call(LineWithTimestamp lineWithTimestamp) { - ArrayList eventList = new ArrayList<>(); - for (String word : lineWithTimestamp.getLine().split(" ")) { -eventList.add(new Event(word, lineWithTimestamp.getTimestamp())); - } - return eventList.iterator(); -} - }; +// Split the lines into words, retaining timestamps +// split() splits each line into an array, and explode() turns the array into multiple rows +// treat words as sessionId of events +Dataset events = lines +.selectExpr("explode(split(value, ' ')) AS sessionId", "timestamp AS eventTime"); -// Split the lines into words, treat words as sessionId of events -Dataset events = lines -.withColumnRenamed("value", "line") -.as(Encoders.bean(LineWithTimestamp.class)) -.flatMap(linesToEvents, Encoders.bean(Event.class)); - -// Sessionize the events. Track number of events, start and end timestamps of session, and +// Sessionize the events. Track number of events, start and end timestamps of session, // and report session updates. -// -// Step 1: Define the state update function -MapGroupsWithStateFunction stateUpdateFunc = - new MapGroupsWithStateFunction() { -@Override public SessionUpdate call( -String sessionId, Iterator events, GroupState state) { - // If timed out, then remove session and send final update - if (state.hasTimedOut()) { -SessionUpdate finalUpdate = new SessionUpdate( -sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true); -state.remove(); -return
[spark] branch master updated: [SPARK-36314][SS] Update Sessionization examples to use native support of session window
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1fafa8e [SPARK-36314][SS] Update Sessionization examples to use native support of session window 1fafa8e is described below commit 1fafa8e191430d7a8d6a6eb5aa19056108f310c9 Author: Jungtaek Lim AuthorDate: Tue Jul 27 20:10:02 2021 -0700 [SPARK-36314][SS] Update Sessionization examples to use native support of session window ### What changes were proposed in this pull request? This PR proposes to update Sessionization examples to use native support of session window. It also adds the example for PySpark as native support of session window is available to PySpark as well. ### Why are the changes needed? We should guide the simplest way to achieve the same workload. I'll provide another example for cases we can't do with native support of session window. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested. Closes #33548 from HeartSaVioR/SPARK-36314. Authored-by: Jungtaek Lim Signed-off-by: Liang-Chi Hsieh --- .../streaming/JavaStructuredSessionization.java| 193 ++--- .../sql/streaming/structured_sessionization.py | 87 ++ .../sql/streaming/StructuredSessionization.scala | 84 ++--- 3 files changed, 112 insertions(+), 252 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java index 34ee235..eb7ce11 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java @@ -16,22 +16,15 @@ */ package org.apache.spark.examples.sql.streaming; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsWithStateFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.streaming.GroupState; -import org.apache.spark.sql.streaming.GroupStateTimeout; import org.apache.spark.sql.streaming.StreamingQuery; -import java.io.Serializable; -import java.sql.Timestamp; -import java.util.*; +import static org.apache.spark.sql.functions.*; /** * Counts words in UTF8 encoded, '\n' delimited text received from the network. * - * Usage: JavaStructuredNetworkWordCount + * Usage: JavaStructuredSessionization * and describe the TCP server that Structured Streaming * would connect to receive data. * @@ -66,86 +59,20 @@ public final class JavaStructuredSessionization { .option("includeTimestamp", true) .load(); -FlatMapFunction linesToEvents = - new FlatMapFunction() { -@Override -public Iterator call(LineWithTimestamp lineWithTimestamp) { - ArrayList eventList = new ArrayList<>(); - for (String word : lineWithTimestamp.getLine().split(" ")) { -eventList.add(new Event(word, lineWithTimestamp.getTimestamp())); - } - return eventList.iterator(); -} - }; +// Split the lines into words, retaining timestamps +// split() splits each line into an array, and explode() turns the array into multiple rows +// treat words as sessionId of events +Dataset events = lines +.selectExpr("explode(split(value, ' ')) AS sessionId", "timestamp AS eventTime"); -// Split the lines into words, treat words as sessionId of events -Dataset events = lines -.withColumnRenamed("value", "line") -.as(Encoders.bean(LineWithTimestamp.class)) -.flatMap(linesToEvents, Encoders.bean(Event.class)); - -// Sessionize the events. Track number of events, start and end timestamps of session, and +// Sessionize the events. Track number of events, start and end timestamps of session, // and report session updates. -// -// Step 1: Define the state update function -MapGroupsWithStateFunction stateUpdateFunc = - new MapGroupsWithStateFunction() { -@Override public SessionUpdate call( -String sessionId, Iterator events, GroupState state) { - // If timed out, then remove session and send final update - if (state.hasTimedOut()) { -SessionUpdate finalUpdate = new SessionUpdate( -sessionId, state.get().calculateDuration(), state.get().getNumEvents(), true); -state.remove(); -return finalUpdate; - - } else { -// Find max and min timestamps in events -long maxTimestampMs =
[spark] branch master updated (f90eb6a -> bcc595c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f90eb6a [SPARK-36263][SQL][PYTHON] Add Dataframe.observation to PySpark add bcc595c [SPARK-36310][PYTHON] Fix IndexOpsMixin.hasnans to use isnull().any() No new revisions were added by this update. Summary of changes: python/pyspark/pandas/base.py| 8 +--- python/pyspark/pandas/tests/indexes/test_base.py | 5 + python/pyspark/pandas/tests/test_series.py | 5 + 3 files changed, 11 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36310][PYTHON] Fix IndexOpsMixin.hasnans to use isnull().any()
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0e9e737 [SPARK-36310][PYTHON] Fix IndexOpsMixin.hasnans to use isnull().any() 0e9e737 is described below commit 0e9e737a8403b73ed19fb35e0f1af7b06b2c7660 Author: Takuya UESHIN AuthorDate: Wed Jul 28 09:21:12 2021 +0900 [SPARK-36310][PYTHON] Fix IndexOpsMixin.hasnans to use isnull().any() ### What changes were proposed in this pull request? Fix `IndexOpsMixin.hasnans` to use `IndexOpsMixin.isnull().any()`. ### Why are the changes needed? `IndexOpsMixin.hasnans` has a potential issue to cause `a window function inside an aggregate function` error. Also it returns a wrong value when the `Series`/`Index` is empty. ```py >>> ps.Series([]).hasnans None ``` whereas: ```py >>> pd.Series([]).hasnans False ``` `IndexOpsMixin.any()` is safe for both cases. ### Does this PR introduce _any_ user-facing change? `IndexOpsMixin.hasnans` will return `False` when empty. ### How was this patch tested? Added some tests. Closes #33547 from ueshin/issues/SPARK-36310/hasnan. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon (cherry picked from commit bcc595c112a23d8e3024ace50f0dbc7eab7144b2) Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/base.py| 8 +--- python/pyspark/pandas/tests/indexes/test_base.py | 5 + python/pyspark/pandas/tests/test_series.py | 5 + 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py index f547f71..832d7e8 100644 --- a/python/pyspark/pandas/base.py +++ b/python/pyspark/pandas/base.py @@ -522,13 +522,7 @@ class IndexOpsMixin(object, metaclass=ABCMeta): >>> ps.Series([1, 2, 3]).rename("a").to_frame().set_index("a").index.hasnans False """ -sdf = self._internal.spark_frame -scol = self.spark.column - -if isinstance(self.spark.data_type, (DoubleType, FloatType)): -return sdf.select(F.max(scol.isNull() | F.isnan(scol))).collect()[0][0] -else: -return sdf.select(F.max(scol.isNull())).collect()[0][0] +return self.isnull().any() @property def is_monotonic(self) -> bool: diff --git a/python/pyspark/pandas/tests/indexes/test_base.py b/python/pyspark/pandas/tests/indexes/test_base.py index d941322..65831d1 100644 --- a/python/pyspark/pandas/tests/indexes/test_base.py +++ b/python/pyspark/pandas/tests/indexes/test_base.py @@ -1766,6 +1766,11 @@ class IndexesTest(PandasOnSparkTestCase, TestUtils): psser = ps.from_pandas(pser) self.assert_eq(pser.hasnans, psser.hasnans) +# empty +pidx = pd.Index([]) +psidx = ps.from_pandas(pidx) +self.assert_eq(pidx.hasnans, psidx.hasnans) + # Not supported for MultiIndex psmidx = ps.Index([("a", 1), ("b", 2)]) self.assertRaises(NotImplementedError, lambda: psmidx.hasnans()) diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index 310e5fb..b42d3cd 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2444,6 +2444,11 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils): psser = ps.from_pandas(pser) self.assert_eq(pser.hasnans, psser.hasnans) +# empty +pser = pd.Series([]) +psser = ps.from_pandas(pser) +self.assert_eq(pser.hasnans, psser.hasnans) + def test_last_valid_index(self): pser = pd.Series([250, 1.5, 320, 1, 0.3, None, None, None, None]) psser = ps.from_pandas(pser) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (df98d5b -> f90eb6a)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from df98d5b [SPARK-34249][DOCS] Add documentation for ANSI implicit cast rules add f90eb6a [SPARK-36263][SQL][PYTHON] Add Dataframe.observation to PySpark No new revisions were added by this update. Summary of changes: dev/sparktestsupport/modules.py| 1 + python/docs/source/reference/pyspark.sql.rst | 13 ++ python/pyspark/sql/__init__.py | 3 +- python/pyspark/sql/__init__.pyi| 1 + python/pyspark/sql/dataframe.py| 41 +- python/pyspark/sql/dataframe.pyi | 2 + python/pyspark/sql/observation.py | 146 + .../information.pyi => sql/observation.pyi}| 15 ++- python/pyspark/sql/tests/test_dataframe.py | 49 +++ 9 files changed, 263 insertions(+), 8 deletions(-) create mode 100644 python/pyspark/sql/observation.py copy python/pyspark/{resource/information.pyi => sql/observation.pyi} (77%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated (6740d07 -> 1061c9f)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 6740d07 [SPARK-36269][SQL] Fix only set data columns to Hive column names config add 1061c9f [SPARK-36242][CORE][3.0] Ensure spill file closed before set success = true in ExternalSorter.spillMemoryIteratorToDisk method No new revisions were added by this update. Summary of changes: .../spark/util/collection/ExternalSorter.scala | 5 +- .../util/collection/ExternalSorterSpillSuite.scala | 147 + 2 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSpillSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated (682b306 -> 797b059)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git. from 682b306 [SPARK-36211][PYTHON] Correct typing of `udf` return value add 797b059 [SPARK-36242][CORE][3.1] Ensure spill file closed before set success = true in ExternalSorter.spillMemoryIteratorToDisk method No new revisions were added by this update. Summary of changes: .../spark/util/collection/ExternalSorter.scala | 5 +- .../util/collection/ExternalSorterSpillSuite.scala | 147 + 2 files changed, 149 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSpillSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-34249][DOCS] Add documentation for ANSI implicit cast rules
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ee3bd71 [SPARK-34249][DOCS] Add documentation for ANSI implicit cast rules ee3bd71 is described below commit ee3bd71c9218120c5146b758d539b092e524d67b Author: Gengliang Wang AuthorDate: Tue Jul 27 20:48:49 2021 +0800 [SPARK-34249][DOCS] Add documentation for ANSI implicit cast rules ### What changes were proposed in this pull request? Add documentation for the ANSI implicit cast rules which are introduced from https://github.com/apache/spark/pull/31349 ### Why are the changes needed? Better documentation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Build and preview in local: ![image](https://user-images.githubusercontent.com/1097932/127149039-f0cc4766-8eca-4061-bc35-c8e67f009544.png) ![image](https://user-images.githubusercontent.com/1097932/127149072-1b65ef56-65ff-4327-9a5e-450d44719073.png) ![image](https://user-images.githubusercontent.com/1097932/127033375-b4536854-ca72-42fa-8ea9-dde158264aa5.png) ![image](https://user-images.githubusercontent.com/1097932/126950445-435ba521-92b8-44d1-8f2c-250b9efb4b98.png) ![image](https://user-images.githubusercontent.com/1097932/126950495-9aa4e960-60cd-4b20-88d9-b697ff57a7f7.png) Closes #33516 from gengliangwang/addDoc. Lead-authored-by: Gengliang Wang Co-authored-by: Serge Rielau Signed-off-by: Wenchen Fan (cherry picked from commit df98d5b5f13c95b283b446e4f9f26bf1ec3e4d97) Signed-off-by: Wenchen Fan --- docs/img/type-precedence-list.png | Bin 0 -> 133793 bytes docs/sql-ref-ansi-compliance.md | 90 ++ 2 files changed, 81 insertions(+), 9 deletions(-) diff --git a/docs/img/type-precedence-list.png b/docs/img/type-precedence-list.png new file mode 100644 index 000..176d3eb Binary files /dev/null and b/docs/img/type-precedence-list.png differ diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index adaa94c..2001c9f 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -31,9 +31,9 @@ When `spark.sql.storeAssignmentPolicy` is set to `ANSI`, Spark SQL complies with |Property Name|Default|Meaning|Since Version| |-|---|---|-| |`spark.sql.ansi.enabled`|false|(Experimental) When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will forbid using the reserved keywords of ANSI SQL as identifiers in the SQL parser.|3.0.0| -|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value into a column with different data type, Spark will perform type coercion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy poli [...] +|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value into a column with different data type, Spark will perform type conversion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy po [...] -The following subsections present behaviour changes in arithmetic operations, type conversions, and SQL parsing when the ANSI mode enabled. +The following subsections present behaviour changes in arithmetic operations, type conversions, and SQL parsing when the ANSI mode enabled. For type conversions in Spark SQL, there are three kinds of them and this article will introduce them one by one: cast, store assignment and type coercion. ### Arithmetic Operations @@ -66,13 +66,11 @@ SELECT abs(-2147483648); ++ ``` -### Type Conversion +### Cast -Spark SQL has three kinds of type conversions: explicit casting, type coercion, and store assignment casting. When `spark.sql.ansi.enabled` is set to `true`, explicit casting by `CAST` syntax throws a runtime exception for illegal cast patterns defined in the standard, e.g. casts from a string to an integer. -On the other hand, `INSERT INTO` syntax throws an analysis exception when the ANSI mode enabled via `spark.sql.storeAssignmentPolicy=ANSI`. -The type conversion of Spark ANSI
[spark] branch master updated (22ac98d -> df98d5b)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 22ac98d Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" add df98d5b [SPARK-34249][DOCS] Add documentation for ANSI implicit cast rules No new revisions were added by this update. Summary of changes: docs/img/type-precedence-list.png | Bin 0 -> 133793 bytes docs/sql-ref-ansi-compliance.md | 90 ++ 2 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 docs/img/type-precedence-list.png - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated (dcd37f9 -> 3d86128)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git. from dcd37f9 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" add 3d86128 [SPARK-34619][SQL][DOCS][3.2] Describe ANSI interval types at the Data types page of the SQL reference No new revisions were added by this update. Summary of changes: docs/sql-ref-datatypes.md | 38 ++ 1 file changed, 38 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new dcd37f9 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" dcd37f9 is described below commit dcd37f963906fd57a706ea25cb5893be2559d788 Author: Liang-Chi Hsieh AuthorDate: Tue Jul 27 19:11:42 2021 +0900 Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" This reverts commit 634f96dde40639df5a2ef246884bedbd48b3dc69. Closes #33533 from viirya/revert-SPARK-36136. Authored-by: Liang-Chi Hsieh Signed-off-by: Hyukjin Kwon (cherry picked from commit 22ac98dcbf48575af7912dab2583e38a2a1b751d) Signed-off-by: Hyukjin Kwon --- .../PruneFileSourcePartitionsSuite.scala | 61 -- .../execution/PruneHiveTablePartitionsSuite.scala | 9 +--- .../hive/execution}/PrunePartitionSuiteBase.scala | 17 +++--- 3 files changed, 41 insertions(+), 46 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala similarity index 80% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 510281a..a669b80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.sql.hive.execution import org.scalatest.matchers.should.Matchers._ @@ -24,19 +24,18 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType -class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with SharedSparkSession { +class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase { override def format: String = "parquet" @@ -46,27 +45,35 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared test("PruneFileSourcePartitions should not change the output of LogicalRelation") { withTable("test") { - spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("test") - val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) - - val dataSchema = StructType(tableMeta.schema.filterNot { f => -tableMeta.partitionColumnNames.contains(f.name) - }) - val relation = HadoopFsRelation( -location = catalogFileIndex, -partitionSchema = tableMeta.partitionSchema, -dataSchema = dataSchema, -bucketSpec = None, -fileFormat = new ParquetFileFormat(), -options = Map.empty)(sparkSession = spark) - - val logicalRelation = LogicalRelation(relation, tableMeta) - val query = Project(Seq(Symbol("id"), Symbol("p")), -Filter(Symbol("p") === 1, logicalRelation)).analyze - - val optimized = Optimize.execute(query) - assert(optimized.missingInput.isEmpty) + withTempDir { dir => +sql( + s""" +|CREATE EXTERNAL TABLE test(i int) +|PARTITIONED BY (p int) +|STORED AS parquet +|LOCATION '${dir.toURI}'""".stripMargin) + +val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") +val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + +val dataSchema =
[spark] branch master updated (f483796 -> 22ac98d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f483796 [SPARK-34619][SQL][DOCS] Describe ANSI interval types at the `Data types` page of the SQL reference add 22ac98d Revert "[SPARK-36136][SQL][TESTS] Refactor PruneFileSourcePartitionsSuite etc to a different package" No new revisions were added by this update. Summary of changes: .../PruneFileSourcePartitionsSuite.scala | 61 -- .../execution/PruneHiveTablePartitionsSuite.scala | 9 +--- .../hive/execution}/PrunePartitionSuiteBase.scala | 17 +++--- 3 files changed, 41 insertions(+), 46 deletions(-) rename sql/{core/src/test/scala/org/apache/spark/sql/execution/datasources => hive/src/test/scala/org/apache/spark/sql/hive/execution}/PruneFileSourcePartitionsSuite.scala (80%) rename sql/{core/src/test/scala/org/apache/spark/sql/execution/datasources => hive/src/test/scala/org/apache/spark/sql/hive/execution}/PrunePartitionSuiteBase.scala (90%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34619][SQL][DOCS] Describe ANSI interval types at the `Data types` page of the SQL reference
This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f483796 [SPARK-34619][SQL][DOCS] Describe ANSI interval types at the `Data types` page of the SQL reference f483796 is described below commit f4837961a9c4c35eaf71406c22874984b454e8fd Author: Max Gekk AuthorDate: Tue Jul 27 19:05:39 2021 +0900 [SPARK-34619][SQL][DOCS] Describe ANSI interval types at the `Data types` page of the SQL reference ### What changes were proposed in this pull request? In the PR, I propose to update the page https://spark.apache.org/docs/latest/sql-ref-datatypes.html and add information about the year-month and day-time interval types introduced by SPARK-27790. https://user-images.githubusercontent.com/1580697/127115289-e633ca3a-2c18-49a0-a7c0-22421ae5c363.png;> ### Why are the changes needed? To inform users about new ANSI interval types, and improve UX with Spark SQL. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Should be tested by a GitHub action. Closes #33518 from MaxGekk/doc-interval-types. Authored-by: Max Gekk Signed-off-by: Kousuke Saruta --- docs/sql-ref-datatypes.md | 38 ++ 1 file changed, 38 insertions(+) diff --git a/docs/sql-ref-datatypes.md b/docs/sql-ref-datatypes.md index ce22d92..89ffa34 100644 --- a/docs/sql-ref-datatypes.md +++ b/docs/sql-ref-datatypes.md @@ -49,6 +49,44 @@ Spark SQL and DataFrames support the following data types: absolute point in time. - `DateType`: Represents values comprising values of fields year, month and day, without a time-zone. +* Interval types + - `YearMonthIntervalType(startField, endField)`: Represents a year-month interval which is made up of a contiguous subset of the following fields: +- MONTH, months within years `[0..11]`, +- YEAR, years in the range `[0..178956970]`. + +Individual interval fields are non-negative, but an interval itself can have a sign, and be negative. + +`startField` is the leftmost field, and `endField` is the rightmost field of the type. Valid values of `startField` and `endField` are 0(MONTH) and 1(YEAR). Supported year-month interval types are: + +|Year-Month Interval Type|SQL type|An instance of the type| +|-||---| +|`YearMonthIntervalType(YEAR, YEAR)` or `YearMonthIntervalType(YEAR)`|INTERVAL YEAR|`INTERVAL '2021' YEAR`| +|`YearMonthIntervalType(YEAR, MONTH)`|INTERVAL YEAR TO MONTH|`INTERVAL '2021-07' YEAR TO MONTH`| +|`YearMonthIntervalType(MONTH, MONTH)` or `YearMonthIntervalType(MONTH)`|INTERVAL MONTH|`INTERVAL '10' MONTH`| + + - `DayTimeIntervalType(startField, endField)`: Represents a day-time interval which is made up of a contiguous subset of the following fields: +- SECOND, seconds within minutes and possibly fractions of a second `[0..59.99]`, +- MINUTE, minutes within hours `[0..59]`, +- HOUR, hours within days `[0..23]`, +- DAY, days in the range `[0..106751991]`. + +Individual interval fields are non-negative, but an interval itself can have a sign, and be negative. + +`startField` is the leftmost field, and `endField` is the rightmost field of the type. Valid values of `startField` and `endField` are 0 (DAY), 1 (HOUR), 2 (MINUTE), 3 (SECOND). Supported day-time interval types are: + +|Day-Time Interval Type|SQL type|An instance of the type| +|-||---| +|`DayTimeIntervalType(DAY, DAY)` or `DayTimeIntervalType(DAY)`|INTERVAL DAY|`INTERVAL '100' DAY`| +|`DayTimeIntervalType(DAY, HOUR)`|INTERVAL DAY TO HOUR|`INTERVAL '100 10' DAY TO HOUR`| +|`DayTimeIntervalType(DAY, MINUTE)`|INTERVAL DAY TO MINUTE|`INTERVAL '100 10:30' DAY TO MINUTE`| +|`DayTimeIntervalType(DAY, SECOND)`|INTERVAL DAY TO SECOND|`INTERVAL '100 10:30:40.99' DAY TO SECOND`| +|`DayTimeIntervalType(HOUR, HOUR)` or `DayTimeIntervalType(HOUR)`|INTERVAL HOUR|`INTERVAL '123' HOUR`| +|`DayTimeIntervalType(HOUR, MINUTE)`|INTERVAL HOUR TO MINUTE|`INTERVAL '123:10' HOUR TO MINUTE`| +|`DayTimeIntervalType(HOUR, SECOND)`|INTERVAL HOUR TO SECOND|`INTERVAL '123:10:59' HOUR TO SECOND`| +|`DayTimeIntervalType(MINUTE, MINUTE)` or `DayTimeIntervalType(MINUTE)`|INTERVAL MINUTE|`INTERVAL '1000' MINUTE`| +|`DayTimeIntervalType(MINUTE, SECOND)`|INTERVAL MINUTE TO SECOND|`INTERVAL '1000:01.001' MINUTE TO SECOND`| +|`DayTimeIntervalType(SECOND, SECOND)` or `DayTimeIntervalType(SECOND)`|INTERVAL SECOND|`INTERVAL '1000.01' SECOND`| + * Complex types - `ArrayType(elementType, containsNull)`: Represents values comprising a sequence of elements with the type of `elementType`. `containsNull` is used to indicate if
[spark] branch branch-3.2 updated: [SPARK-36241][SQL] Support creating tables with null column
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 91b9de3 [SPARK-36241][SQL] Support creating tables with null column 91b9de3 is described below commit 91b9de3d8036dbc0f14388c61c4fe171a221bbcd Author: Linhong Liu AuthorDate: Tue Jul 27 17:31:52 2021 +0800 [SPARK-36241][SQL] Support creating tables with null column ### What changes were proposed in this pull request? Previously we blocked creating tables with the null column to follow the hive behavior in PR #28833 In this PR, I propose the restore the previous behavior to support the null column in a table. ### Why are the changes needed? For a complex query, it's possible to generate a column with null type. If this happens to the input query of CTAS, the query will fail due to Spark doesn't allow creating a table with null type. From the user's perspective, it’s hard to figure out why the null type column is produced in the complicated query and how to fix it. So removing this constraint is more friendly to users. ### Does this PR introduce _any_ user-facing change? Yes, this reverts the previous behavior change in #28833, for example, below command will success after this PR ```sql CREATE TABLE t (col_1 void, col_2 int) ``` ### How was this patch tested? newly added and existing test cases Closes #33488 from linhongliu-db/SPARK-36241-support-void-column. Authored-by: Linhong Liu Signed-off-by: Wenchen Fan (cherry picked from commit 8e7e14dc0d182cfe136e103d0b2370844ff661de) Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/ResolveCatalogs.scala| 10 -- .../sql/catalyst/plans/logical/v2Commands.scala| 2 - .../sql/connector/catalog/CatalogV2Util.scala | 21 +--- .../spark/sql/errors/QueryCompilationErrors.scala | 4 - .../catalyst/analysis/ResolveSessionCatalog.scala | 10 -- .../spark/sql/execution/datasources/rules.scala| 3 - .../org/apache/spark/sql/hive/HiveStrategies.scala | 3 - .../spark/sql/hive/execution/HiveDDLSuite.scala| 116 + 8 files changed, 25 insertions(+), 144 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index d7603fb..c1e3644e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -33,7 +33,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case AlterTableAddColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - cols.foreach(c => failNullType(c.dataType)) val changes = cols.map { col => TableChange.addColumn( col.name.toArray, @@ -46,7 +45,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case AlterTableReplaceColumnsStatement( nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - cols.foreach(c => failNullType(c.dataType)) val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match { case Some(table) => // REPLACE COLUMNS deletes all the existing columns and adds new columns specified. @@ -68,7 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - assertNoNullTypeInSchema(c.tableSchema) CreateV2Table( catalog.asTableCatalog, tbl.asIdentifier, @@ -80,9 +77,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => - if (c.asSelect.resolved) { -assertNoNullTypeInSchema(c.asSelect.schema) - } CreateTableAsSelect( catalog.asTableCatalog, tbl.asIdentifier, @@ -95,7 +89,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) => - assertNoNullTypeInSchema(c.tableSchema) ReplaceTable( catalog.asTableCatalog, tbl.asIdentifier, @@ -107,9 +100,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ ReplaceTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - if (c.asSelect.resolved) { -
[spark] branch master updated (674202e -> 8e7e14d)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 674202e [SPARK-36285][INFRA][TESTS] Skip MiMa in PySpark/SparkR/Docker GHA job add 8e7e14d [SPARK-36241][SQL] Support creating tables with null column No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/ResolveCatalogs.scala| 10 -- .../sql/catalyst/plans/logical/v2Commands.scala| 2 - .../sql/connector/catalog/CatalogV2Util.scala | 21 +--- .../spark/sql/errors/QueryCompilationErrors.scala | 4 - .../catalyst/analysis/ResolveSessionCatalog.scala | 10 -- .../spark/sql/execution/datasources/rules.scala| 3 - .../org/apache/spark/sql/hive/HiveStrategies.scala | 3 - .../spark/sql/hive/execution/HiveDDLSuite.scala| 116 + 8 files changed, 25 insertions(+), 144 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36285][INFRA][TESTS] Skip MiMa in PySpark/SparkR/Docker GHA job
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new dfa5c4d [SPARK-36285][INFRA][TESTS] Skip MiMa in PySpark/SparkR/Docker GHA job dfa5c4d is described below commit dfa5c4dadc4329e2726c344211bf18b4bdcf4a9b Author: William Hyun AuthorDate: Tue Jul 27 16:47:59 2021 +0900 [SPARK-36285][INFRA][TESTS] Skip MiMa in PySpark/SparkR/Docker GHA job This PR aims to skip MiMa in PySpark/SparkR/Docker GHA job. This will save GHA resource because MiMa is irrelevant to Python. No. Pass the GHA. Closes #33532 from williamhyun/mima. Lead-authored-by: William Hyun Co-authored-by: Dongjoon Hyun Signed-off-by: Hyukjin Kwon (cherry picked from commit 674202e7b6d640ff7d9ee1787ef4e8b3ed822207) Signed-off-by: Hyukjin Kwon --- .github/workflows/build_and_test.yml | 3 +++ dev/run-tests.py | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 73a45df..cfc20ac 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -169,6 +169,7 @@ jobs: GITHUB_PREV_SHA: ${{ github.event.before }} SPARK_LOCAL_IP: localhost SKIP_UNIDOC: true + SKIP_MIMA: true METASPACE_SIZE: 128m steps: - name: Checkout Spark repository @@ -251,6 +252,7 @@ jobs: HIVE_PROFILE: hive2.3 GITHUB_PREV_SHA: ${{ github.event.before }} SPARK_LOCAL_IP: localhost + SKIP_MIMA: true steps: - name: Checkout Spark repository uses: actions/checkout@v2 @@ -622,6 +624,7 @@ jobs: GITHUB_PREV_SHA: ${{ github.event.before }} SPARK_LOCAL_IP: localhost ORACLE_DOCKER_IMAGE_NAME: oracle/database:18.4.0-xe + SKIP_MIMA: true steps: - name: Checkout Spark repository uses: actions/checkout@v2 diff --git a/dev/run-tests.py b/dev/run-tests.py index 59e891c..507846a 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -804,7 +804,8 @@ def main(): # backwards compatibility checks if build_tool == "sbt": # Note: compatibility tests only supported in sbt for now -detect_binary_inop_with_mima(extra_profiles) +if not os.environ.get("SKIP_MIMA"): +detect_binary_inop_with_mima(extra_profiles) # Since we did not build assembly/package before running dev/mima, we need to # do it here because the tests still rely on it; see SPARK-13294 for details. build_spark_assembly_sbt(extra_profiles, should_run_java_style_checks) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ede1bc6 -> 674202e)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ede1bc6 [SPARK-36211][PYTHON] Correct typing of `udf` return value add 674202e [SPARK-36285][INFRA][TESTS] Skip MiMa in PySpark/SparkR/Docker GHA job No new revisions were added by this update. Summary of changes: .github/workflows/build_and_test.yml | 3 +++ dev/run-tests.py | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-36211][PYTHON] Correct typing of `udf` return value
This is an automated email from the ASF dual-hosted git repository. zero323 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 682b306 [SPARK-36211][PYTHON] Correct typing of `udf` return value 682b306 is described below commit 682b306f8e189ec7f2b8179b16741adad396 Author: Luran He AuthorDate: Tue Jul 27 09:07:22 2021 +0200 [SPARK-36211][PYTHON] Correct typing of `udf` return value The following code should type-check: ```python3 import uuid import pyspark.sql.functions as F my_udf = F.udf(lambda: str(uuid.uuid4())).asNondeterministic() ``` ### What changes were proposed in this pull request? The `udf` function should return a more specific type. ### Why are the changes needed? Right now, `mypy` will throw spurious errors, such as for the code given above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This was not tested. Sorry, I am not very familiar with this repo -- are there any typing tests? Closes #33399 from luranhe/patch-1. Lead-authored-by: Luran He Co-authored-by: Luran He Signed-off-by: zero323 (cherry picked from commit ede1bc6b51c23b2d857b497d335b8e7fe3a5e0cc) Signed-off-by: zero323 --- python/pyspark/sql/_typing.pyi | 12 +--- python/pyspark/sql/functions.pyi | 7 --- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index 799a732..1a3bd8f 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -18,6 +18,7 @@ from typing import ( Any, +Callable, List, Optional, Tuple, @@ -30,11 +31,10 @@ import datetime import decimal from pyspark._typing import PrimitiveType -import pyspark.sql.column import pyspark.sql.types from pyspark.sql.column import Column -ColumnOrName = Union[pyspark.sql.column.Column, str] +ColumnOrName = Union[Column, str] DecimalLiteral = decimal.Decimal DateTimeLiteral = Union[datetime.datetime, datetime.date] LiteralType = PrimitiveType @@ -54,4 +54,10 @@ class SupportsClose(Protocol): def close(self, error: Exception) -> None: ... class UserDefinedFunctionLike(Protocol): -def __call__(self, *_: ColumnOrName) -> Column: ... +func: Callable[..., Any] +evalType: int +deterministic: bool +@property +def returnType(self) -> pyspark.sql.types.DataType: ... +def __call__(self, *args: ColumnOrName) -> Column: ... +def asNondeterministic(self) -> UserDefinedFunctionLike: ... diff --git a/python/pyspark/sql/functions.pyi b/python/pyspark/sql/functions.pyi index 5fec6fd..749bcce 100644 --- a/python/pyspark/sql/functions.pyi +++ b/python/pyspark/sql/functions.pyi @@ -22,6 +22,7 @@ from typing import Any, Callable, Dict, List, Optional, Union from pyspark.sql._typing import ( ColumnOrName, DataTypeOrString, +UserDefinedFunctionLike, ) from pyspark.sql.pandas.functions import ( # noqa: F401 pandas_udf as pandas_udf, @@ -346,13 +347,13 @@ def variance(col: ColumnOrName) -> Column: ... @overload def udf( f: Callable[..., Any], returnType: DataTypeOrString = ... -) -> Callable[..., Column]: ... +) -> UserDefinedFunctionLike: ... @overload def udf( f: DataTypeOrString = ..., -) -> Callable[[Callable[..., Any]], Callable[..., Column]]: ... +) -> Callable[[Callable[..., Any]], UserDefinedFunctionLike]: ... @overload def udf( *, returnType: DataTypeOrString = ..., -) -> Callable[[Callable[..., Any]], Callable[..., Column]]: ... +) -> Callable[[Callable[..., Any]], UserDefinedFunctionLike]: ... - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36211][PYTHON] Correct typing of `udf` return value
This is an automated email from the ASF dual-hosted git repository. zero323 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 8a3b1cd [SPARK-36211][PYTHON] Correct typing of `udf` return value 8a3b1cd is described below commit 8a3b1cd811a6c998746fdd916b7f5ac35498d292 Author: Luran He AuthorDate: Tue Jul 27 09:07:22 2021 +0200 [SPARK-36211][PYTHON] Correct typing of `udf` return value The following code should type-check: ```python3 import uuid import pyspark.sql.functions as F my_udf = F.udf(lambda: str(uuid.uuid4())).asNondeterministic() ``` ### What changes were proposed in this pull request? The `udf` function should return a more specific type. ### Why are the changes needed? Right now, `mypy` will throw spurious errors, such as for the code given above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This was not tested. Sorry, I am not very familiar with this repo -- are there any typing tests? Closes #33399 from luranhe/patch-1. Lead-authored-by: Luran He Co-authored-by: Luran He Signed-off-by: zero323 (cherry picked from commit ede1bc6b51c23b2d857b497d335b8e7fe3a5e0cc) Signed-off-by: zero323 --- python/pyspark/sql/_typing.pyi | 12 +--- python/pyspark/sql/functions.pyi | 7 --- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index 799a732..1a3bd8f 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -18,6 +18,7 @@ from typing import ( Any, +Callable, List, Optional, Tuple, @@ -30,11 +31,10 @@ import datetime import decimal from pyspark._typing import PrimitiveType -import pyspark.sql.column import pyspark.sql.types from pyspark.sql.column import Column -ColumnOrName = Union[pyspark.sql.column.Column, str] +ColumnOrName = Union[Column, str] DecimalLiteral = decimal.Decimal DateTimeLiteral = Union[datetime.datetime, datetime.date] LiteralType = PrimitiveType @@ -54,4 +54,10 @@ class SupportsClose(Protocol): def close(self, error: Exception) -> None: ... class UserDefinedFunctionLike(Protocol): -def __call__(self, *_: ColumnOrName) -> Column: ... +func: Callable[..., Any] +evalType: int +deterministic: bool +@property +def returnType(self) -> pyspark.sql.types.DataType: ... +def __call__(self, *args: ColumnOrName) -> Column: ... +def asNondeterministic(self) -> UserDefinedFunctionLike: ... diff --git a/python/pyspark/sql/functions.pyi b/python/pyspark/sql/functions.pyi index 051a6f1..8342e65 100644 --- a/python/pyspark/sql/functions.pyi +++ b/python/pyspark/sql/functions.pyi @@ -22,6 +22,7 @@ from typing import Any, Callable, Dict, List, Optional, Union from pyspark.sql._typing import ( ColumnOrName, DataTypeOrString, +UserDefinedFunctionLike, ) from pyspark.sql.pandas.functions import ( # noqa: F401 pandas_udf as pandas_udf, @@ -359,13 +360,13 @@ def variance(col: ColumnOrName) -> Column: ... @overload def udf( f: Callable[..., Any], returnType: DataTypeOrString = ... -) -> Callable[..., Column]: ... +) -> UserDefinedFunctionLike: ... @overload def udf( f: DataTypeOrString = ..., -) -> Callable[[Callable[..., Any]], Callable[..., Column]]: ... +) -> Callable[[Callable[..., Any]], UserDefinedFunctionLike]: ... @overload def udf( *, returnType: DataTypeOrString = ..., -) -> Callable[[Callable[..., Any]], Callable[..., Column]]: ... +) -> Callable[[Callable[..., Any]], UserDefinedFunctionLike]: ... - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36211][PYTHON] Correct typing of `udf` return value
This is an automated email from the ASF dual-hosted git repository. zero323 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ede1bc6 [SPARK-36211][PYTHON] Correct typing of `udf` return value ede1bc6 is described below commit ede1bc6b51c23b2d857b497d335b8e7fe3a5e0cc Author: Luran He AuthorDate: Tue Jul 27 09:07:22 2021 +0200 [SPARK-36211][PYTHON] Correct typing of `udf` return value The following code should type-check: ```python3 import uuid import pyspark.sql.functions as F my_udf = F.udf(lambda: str(uuid.uuid4())).asNondeterministic() ``` ### What changes were proposed in this pull request? The `udf` function should return a more specific type. ### Why are the changes needed? Right now, `mypy` will throw spurious errors, such as for the code given above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This was not tested. Sorry, I am not very familiar with this repo -- are there any typing tests? Closes #33399 from luranhe/patch-1. Lead-authored-by: Luran He Co-authored-by: Luran He Signed-off-by: zero323 --- python/pyspark/sql/_typing.pyi | 12 +--- python/pyspark/sql/functions.pyi | 7 --- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index 799a732..1a3bd8f 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -18,6 +18,7 @@ from typing import ( Any, +Callable, List, Optional, Tuple, @@ -30,11 +31,10 @@ import datetime import decimal from pyspark._typing import PrimitiveType -import pyspark.sql.column import pyspark.sql.types from pyspark.sql.column import Column -ColumnOrName = Union[pyspark.sql.column.Column, str] +ColumnOrName = Union[Column, str] DecimalLiteral = decimal.Decimal DateTimeLiteral = Union[datetime.datetime, datetime.date] LiteralType = PrimitiveType @@ -54,4 +54,10 @@ class SupportsClose(Protocol): def close(self, error: Exception) -> None: ... class UserDefinedFunctionLike(Protocol): -def __call__(self, *_: ColumnOrName) -> Column: ... +func: Callable[..., Any] +evalType: int +deterministic: bool +@property +def returnType(self) -> pyspark.sql.types.DataType: ... +def __call__(self, *args: ColumnOrName) -> Column: ... +def asNondeterministic(self) -> UserDefinedFunctionLike: ... diff --git a/python/pyspark/sql/functions.pyi b/python/pyspark/sql/functions.pyi index 051a6f1..8342e65 100644 --- a/python/pyspark/sql/functions.pyi +++ b/python/pyspark/sql/functions.pyi @@ -22,6 +22,7 @@ from typing import Any, Callable, Dict, List, Optional, Union from pyspark.sql._typing import ( ColumnOrName, DataTypeOrString, +UserDefinedFunctionLike, ) from pyspark.sql.pandas.functions import ( # noqa: F401 pandas_udf as pandas_udf, @@ -359,13 +360,13 @@ def variance(col: ColumnOrName) -> Column: ... @overload def udf( f: Callable[..., Any], returnType: DataTypeOrString = ... -) -> Callable[..., Column]: ... +) -> UserDefinedFunctionLike: ... @overload def udf( f: DataTypeOrString = ..., -) -> Callable[[Callable[..., Any]], Callable[..., Column]]: ... +) -> Callable[[Callable[..., Any]], UserDefinedFunctionLike]: ... @overload def udf( *, returnType: DataTypeOrString = ..., -) -> Callable[[Callable[..., Any]], Callable[..., Column]]: ... +) -> Callable[[Callable[..., Any]], UserDefinedFunctionLike]: ... - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org