[spark] branch master updated (f1f856d5463 -> 96bac6c033b)
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f1f856d5463 [SPARK-45526][PYTHON][DOCS] Improve the example of DataFrameReader/Writer.options to take a dictionary add 96bac6c033b [SPARK-45508][CORE] Add "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access Cleaner on Java 9+ No new revisions were added by this update. Summary of changes: common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java | 7 ++- .../src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java | 7 +++ .../src/main/java/org/apache/spark/launcher/JavaModuleOptions.java | 1 + pom.xml| 1 + project/SparkBuild.scala | 1 + 5 files changed, 16 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45526][PYTHON][DOCS] Improve the example of DataFrameReader/Writer.options to take a dictionary
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f1f856d5463 [SPARK-45526][PYTHON][DOCS] Improve the example of DataFrameReader/Writer.options to take a dictionary f1f856d5463 is described below commit f1f856d546360d34ca1f7ee1ddc163381586b180 Author: Hyukjin Kwon AuthorDate: Fri Oct 13 14:23:09 2023 +0900 [SPARK-45526][PYTHON][DOCS] Improve the example of DataFrameReader/Writer.options to take a dictionary ### What changes were proposed in this pull request? This PR proposes to add the example of DataFrameReader/Writer.options to take a dictionary. ### Why are the changes needed? For users to know how to set options in a dictionary ay PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it describes an example for setting the options with a dictionary. ### How was this patch tested? Existing doctests in this PR's CI. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43357 Closes #43358 from HyukjinKwon/SPARK-45528. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/readwriter.py | 14 -- python/pyspark/sql/streaming/readwriter.py | 10 ++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ea429a75e15..81977c9e8cc 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -220,7 +220,12 @@ class DataFrameReader(OptionUtils): Examples ->>> spark.read.option("key", "value") +>>> spark.read.options(key="value") +<...readwriter.DataFrameReader object ...> + +Specify options in a dictionary. + +>>> spark.read.options(**{"k1": "v1", "k2": "v2"}) <...readwriter.DataFrameReader object ...> Specify the option 'nullValue' and 'header' with reading a CSV file. @@ -1172,7 +1177,12 @@ class DataFrameWriter(OptionUtils): Examples ->>> spark.range(1).write.option("key", "value") +>>> spark.range(1).write.options(key="value") +<...readwriter.DataFrameWriter object ...> + +Specify options in a dictionary. + +>>> spark.range(1).write.options(**{"k1": "v1", "k2": "v2"}) <...readwriter.DataFrameWriter object ...> Specify the option 'nullValue' and 'header' with writing a CSV file. diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py index 2026651ce12..b0f01c06b2e 100644 --- a/python/pyspark/sql/streaming/readwriter.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -224,6 +224,11 @@ class DataStreamReader(OptionUtils): >>> spark.readStream.options(x="1", y=2) <...streaming.readwriter.DataStreamReader object ...> +Specify options in a dictionary. + +>>> spark.readStream.options(**{"k1": "v1", "k2": "v2"}) +<...streaming.readwriter.DataStreamReader object ...> + The example below specifies 'rowsPerSecond' and 'numPartitions' options to Rate source in order to generate 10 rows with 10 partitions every second. @@ -943,6 +948,11 @@ class DataStreamWriter: >>> df.writeStream.option("x", 1) <...streaming.readwriter.DataStreamWriter object ...> +Specify options in a dictionary. + +>>> df.writeStream.options(**{"k1": "v1", "k2": "v2"}) +<...streaming.readwriter.DataStreamWriter object ...> + The example below specifies 'numRows' and 'truncate' options to Console source in order to print 3 rows for every batch without truncating the results. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-45498][CORE] Followup: Ignore task completion from old stage a…
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new b5f3dc9e760 [SPARK-45498][CORE] Followup: Ignore task completion from old stage a… b5f3dc9e760 is described below commit b5f3dc9e76082a81357555ace0c489df97e6f81a Author: mayurb AuthorDate: Fri Oct 13 10:17:56 2023 +0800 [SPARK-45498][CORE] Followup: Ignore task completion from old stage a… ### What changes were proposed in this pull request? With [SPARK-45182](https://issues.apache.org/jira/browse/SPARK-45182), we added a fix for not letting laggard tasks of the older attempts of the indeterminate stage from marking the partition has completed in the map output tracker. When a task is completed, the DAG scheduler also notifies all the task sets of the stage about that partition being completed. Tasksets would not schedule such tasks if they are not already scheduled. This is not correct for the indeterminate stage, since we want to re-run all the tasks on a re-attempt ### Why are the changes needed? Since the partition is not completed by older attempts and the partition from the newer attempt also doesn't get scheduled, the stage will have to be rescheduled to complete that partition. Since the stage is indeterminate, all the partitions will be recomputed ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added check in existing unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43326 from mayurdb/indeterminateFix. Authored-by: mayurb Signed-off-by: Wenchen Fan (cherry picked from commit fb3b707bc1c875c14ff7c6e7a3f39b5c4b852c86) Signed-off-by: Wenchen Fan --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d73bb633901..d8adaae19b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1847,9 +1847,9 @@ private[spark] class DAGScheduler( case Success => // An earlier attempt of a stage (which is zombie) may still have running tasks. If these // tasks complete, they still count and we can mark the corresponding partitions as -// finished. Here we notify the task scheduler to skip running tasks for the same partition, -// to save resource. -if (task.stageAttemptId < stage.latestInfo.attemptNumber()) { +// finished if the stage is determinate. Here we notify the task scheduler to skip running +// tasks for the same partition to save resource. +if (!stage.isIndeterminate && task.stageAttemptId < stage.latestInfo.attemptNumber()) { taskScheduler.notifyPartitionCompletion(stageId, task.partitionId) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e351f8b95bb..9b7c5d5ace3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3169,13 +3169,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti makeMapStatus("hostB", 2))) -// The second task of the shuffle map stage 1 from 1st attempt succeeds +// The second task of the shuffle map stage 1 from 1st attempt succeeds runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostC", 2))) +// Above task completion should not mark the partition 1 complete from 2nd attempt +assert(!tasksMarkedAsCompleted.contains(taskSets(3).tasks(1))) + // This task completion should get ignored and partition 1 should be missing // for shuffle map stage 1 assert(mapOutputTracker.findMissingPartitions(shuffleId2) == Some(Seq(1))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45498][CORE] Followup: Ignore task completion from old stage a…
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fb3b707bc1c [SPARK-45498][CORE] Followup: Ignore task completion from old stage a… fb3b707bc1c is described below commit fb3b707bc1c875c14ff7c6e7a3f39b5c4b852c86 Author: mayurb AuthorDate: Fri Oct 13 10:17:56 2023 +0800 [SPARK-45498][CORE] Followup: Ignore task completion from old stage a… ### What changes were proposed in this pull request? With [SPARK-45182](https://issues.apache.org/jira/browse/SPARK-45182), we added a fix for not letting laggard tasks of the older attempts of the indeterminate stage from marking the partition has completed in the map output tracker. When a task is completed, the DAG scheduler also notifies all the task sets of the stage about that partition being completed. Tasksets would not schedule such tasks if they are not already scheduled. This is not correct for the indeterminate stage, since we want to re-run all the tasks on a re-attempt ### Why are the changes needed? Since the partition is not completed by older attempts and the partition from the newer attempt also doesn't get scheduled, the stage will have to be rescheduled to complete that partition. Since the stage is indeterminate, all the partitions will be recomputed ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added check in existing unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43326 from mayurdb/indeterminateFix. Authored-by: mayurb Signed-off-by: Wenchen Fan --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 - 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a456f91d4c9..07a71ebed08 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1847,9 +1847,9 @@ private[spark] class DAGScheduler( case Success => // An earlier attempt of a stage (which is zombie) may still have running tasks. If these // tasks complete, they still count and we can mark the corresponding partitions as -// finished. Here we notify the task scheduler to skip running tasks for the same partition, -// to save resource. -if (task.stageAttemptId < stage.latestInfo.attemptNumber()) { +// finished if the stage is determinate. Here we notify the task scheduler to skip running +// tasks for the same partition to save resource. +if (!stage.isIndeterminate && task.stageAttemptId < stage.latestInfo.attemptNumber()) { taskScheduler.notifyPartitionCompletion(stageId, task.partitionId) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7bb8f49e6bf..7691b98f620 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3169,13 +3169,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti makeMapStatus("hostB", 2))) -// The second task of the shuffle map stage 1 from 1st attempt succeeds +// The second task of the shuffle map stage 1 from 1st attempt succeeds runEvent(makeCompletionEvent( taskSets(1).tasks(1), Success, makeMapStatus("hostC", 2))) +// Above task completion should not mark the partition 1 complete from 2nd attempt +assert(!tasksMarkedAsCompleted.contains(taskSets(3).tasks(1))) + // This task completion should get ignored and partition 1 should be missing // for shuffle map stage 1 assert(mapOutputTracker.findMissingPartitions(shuffleId2) == Some(Seq(1))) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f72b87b90be -> 12880c846b5)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f72b87b90be [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace the regular `switch` statement add 12880c846b5 [SPARK-45266][PYTHON][FOLLOWUP] Fix to resolve UnresolvedPolymorphicPythonUDTF when the table argument is specified as a named argument No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_udtf.py | 272 - .../spark/sql/catalyst/analysis/Analyzer.scala | 2 + 2 files changed, 156 insertions(+), 118 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Re: [PR] Add canonical links to the PySpark docs page for published docs [spark-website]
panbingkun commented on PR #482: URL: https://github.com/apache/spark-website/pull/482#issuecomment-1760614768 > @panbingkun thanks for doing this. However, I discovered that some of the canonical links generated are not a valid URL, for example: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html Is there a way to update this canonical link to the actual latest documentation for groupBy? Yes, I also noticed this, and the reason is not the issue of updating the logic, Because the location of the same document may change in different versions, such as: In version 3.1.1, the location of the `pyspark.sql.DataFrame.groupBy` file is: `https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html?highlight=groupby#pyspark.sql.DataFrame.groupBy` According to normal logic, `canonical link` should be: `https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html; />` But in the new version, this document has been moved to a different location: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html?highlight=groupby#pyspark.sql.DataFrame.groupBy So, from the perspective of the old document, it seems that `canonical link` is incorrect. This issue will always exist. If the position of the document changes in the new version, the `canonical link` of the old document will need to be updated synchronously. Of course, we can do it manually, but what should we do if we update the document location later? This is a difficult problem. Additionally, there will be another issue, which is the disappearance of documents. How to handle this? Do not add `canonical link`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace the regular `switch` statement
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f72b87b90be [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace the regular `switch` statement f72b87b90be is described below commit f72b87b90bea137050e3e2edceaf962eb7924f13 Author: yangjie01 AuthorDate: Fri Oct 13 08:49:21 2023 +0800 [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace the regular `switch` statement ### What changes were proposed in this pull request? This pr use enhanced `switch` Expressions to replace the regular `switch` statement in Spark Java code refer to [JEP 361](https://openjdk.org/jeps/361) Example: ```java double getPrice(String fruit) { switch (fruit) { case "Apple": return 1.0; case "Orange": return 1.5; case "Mango": return 2.0; default: throw new IllegalArgumentException(); } } ``` Can be changed to ```java double getPrice(String fruit) { return switch (fruit) { case "Apple" -> 1.0; case "Orange" -> 1.5; case "Mango" -> 2.0; default -> throw new IllegalArgumentException(); }; } ``` This pr does not include parts of the `hive-thriftserver` module. ### Why are the changes needed? Using `JEP 361: Switch Expressions` can bring the following benefits: 1. **More concise syntax**: `switch` can be used as an expression, not just a statement. This makes the code more concise and easier to read. 2. **Safer**: In `switch` expressions, if we forget the `break`, there will be no unexpected `fall-through` behavior. At the same time, the compiler will check whether all possible cases are covered. If not all cases are covered, the compiler will report an error. 3. **Easier to understand**: The new `switch` expression syntax is closer to our decision-making pattern in daily life, making the code easier to understand. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43349 from LuciferYang/jep-361. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../org/apache/spark/network/protocol/Message.java | 34 ++--- .../spark/network/protocol/MessageDecoder.java | 59 ++ .../network/server/BlockPushNonFatalFailure.java | 16 +++--- .../org/apache/spark/network/util/DBProvider.java | 13 +++-- .../org/apache/spark/network/util/NettyUtils.java | 36 + .../apache/spark/network/RpcIntegrationSuite.java | 18 +++ .../org/apache/spark/network/StreamTestHelper.java | 24 - .../shuffle/protocol/BlockTransferMessage.java | 42 +++ .../network/shuffle/ExternalBlockHandlerSuite.java | 34 ++--- .../shuffle/ExternalShuffleIntegrationSuite.java | 10 ++-- .../network/yarn/YarnShuffleServiceMetrics.java| 17 ++- .../apache/spark/unsafe/UnsafeAlignedOffset.java | 12 ++--- .../catalyst/expressions/ExpressionImplUtils.java | 23 - .../sql/connector/expressions/NullOrdering.java| 12 ++--- .../sql/connector/expressions/SortDirection.java | 12 ++--- .../sql/connector/util/V2ExpressionSQLBuilder.java | 8 ++- .../datasources/parquet/ParquetColumnVector.java | 12 ++--- .../parquet/VectorizedRleValuesReader.java | 11 ++-- 18 files changed, 155 insertions(+), 238 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java index 12ebee8da96..0bcce788ec4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java @@ -55,23 +55,23 @@ public interface Message extends Encodable { public static Type decode(ByteBuf buf) { byte id = buf.readByte(); - switch (id) { -case 0: return ChunkFetchRequest; -case 1: return ChunkFetchSuccess; -case 2: return ChunkFetchFailure; -case 3: return RpcRequest; -case 4: return RpcResponse; -case 5: return RpcFailure; -case 6: return StreamRequest; -case 7: return StreamResponse; -case 8: return StreamFailure; -case 9: return OneWayMessage; -case 10: return UploadStream; -case 11: return MergedBlockMetaRequest; -case 12: return MergedBlockMetaSuccess; -case -1: throw new IllegalArgumentException("User type messages cannot be
[spark] branch master updated: [SPARK-45521][ML] Avoid re-computation of nnz in `VectorAssembler`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4df9fa24f56 [SPARK-45521][ML] Avoid re-computation of nnz in `VectorAssembler` 4df9fa24f56 is described below commit 4df9fa24f56161f7aab08611fa32efc1a89a0ab2 Author: Ruifeng Zheng AuthorDate: Fri Oct 13 08:37:04 2023 +0800 [SPARK-45521][ML] Avoid re-computation of nnz in `VectorAssembler` ### What changes were proposed in this pull request? 1, add a new private `compressed` method with given `nnz`, since we can know it sometime; 2, minor change `Array.range(0, length)` -> `Iterator.range(0, length)` to avoid array creation; ### Why are the changes needed? in `VectorAssembler`, the `nnz` if already known before vector construction, the scan to compute nnz can be skipped; ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #43353 from zhengruifeng/ml_vec_opt. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala | 5 +++-- .../main/scala/org/apache/spark/ml/feature/VectorAssembler.scala | 8 +--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala index 985f67fc3c3..827ca3f8b9d 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala @@ -184,8 +184,9 @@ sealed trait Vector extends Serializable { * Returns a vector in either dense or sparse format, whichever uses less storage. */ @Since("2.0.0") - def compressed: Vector = { -val nnz = numNonzeros + def compressed: Vector = compressed(numNonzeros) + + private[ml] def compressed(nnz: Int): Vector = { // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 * nnz + 20 bytes. if (1.5 * (nnz + 1.0) < size) { toSparseWithSize(nnz) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index 7bc5e56aaeb..761352e34a3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -279,8 +279,8 @@ object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { featureIndex += vec.size case null => if (keepInvalid) { - val length: Int = lengths(inputColumnIndex) - Array.range(0, length).foreach { i => + val length = lengths(inputColumnIndex) + Iterator.range(0, length).foreach { i => indices += featureIndex + i values += Double.NaN } @@ -295,6 +295,8 @@ object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { case o => throw new SparkException(s"$o of type ${o.getClass.getName} is not supported.") } -Vectors.sparse(featureIndex, indices.result(), values.result()).compressed + +val (idxArray, valArray) = (indices.result(), values.result()) +Vectors.sparse(featureIndex, idxArray, valArray).compressed(idxArray.length) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45418][SQL][PYTHON][CONNECT] Change current_database() column alias to current_schema()
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 12638b851f3 [SPARK-45418][SQL][PYTHON][CONNECT] Change current_database() column alias to current_schema() 12638b851f3 is described below commit 12638b851f37832ed85b63374d7f83dfbb924cd6 Author: Michael Zhang AuthorDate: Fri Oct 13 08:21:01 2023 +0800 [SPARK-45418][SQL][PYTHON][CONNECT] Change current_database() column alias to current_schema() ### What changes were proposed in this pull request? Change column alias for current_database() to current_schema. ### Why are the changes needed? To better align with preferred usage of schema rather than database for three part namespace. ### Does this PR introduce _any_ user-facing change? Yes, `current_database()` column alias is now `current_schema()`. ### How was this patch tested? Unit tests pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43235 from michaelzhan-db/SPARK-45418. Authored-by: Michael Zhang Signed-off-by: Wenchen Fan --- .../function_current_database.explain| 2 +- .../explain-results/function_current_schema.explain | 2 +- python/pyspark/sql/functions.py | 20 ++-- .../apache/spark/sql/catalyst/expressions/misc.scala | 2 +- .../resources/sql-functions/sql-expression-schema.md | 4 ++-- .../current_database_catalog.sql.out | 2 +- .../analyzer-results/sql-session-variables.sql.out | 2 +- .../results/current_database_catalog.sql.out | 2 +- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain index 93dfac524d9..481c0a478c8 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain @@ -1,2 +1,2 @@ -Project [current_database() AS current_database()#0] +Project [current_schema() AS current_schema()#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain index 93dfac524d9..481c0a478c8 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain @@ -1,2 +1,2 @@ -Project [current_database() AS current_database()#0] +Project [current_schema() AS current_schema()#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 25958bdf15d..31e5884e9eb 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -8719,11 +8719,11 @@ def current_database() -> Column: Examples >>> spark.range(1).select(current_database()).show() -+--+ -|current_database()| -+--+ -| default| -+--+ +++ +|current_schema()| +++ +| default| +++ """ return _invoke_function("current_database") @@ -8738,11 +8738,11 @@ def current_schema() -> Column: >>> import pyspark.sql.functions as sf >>> spark.range(1).select(sf.current_schema()).show() -+--+ -|current_database()| -+--+ -| default| -+--+ +++ +|current_schema()| +++ +| default| +++ """ return _invoke_function("current_schema") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 4a54ccf4a31..60bf5c603d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -189,7 +189,7 @@ object AssertTrue { case class CurrentDatabase() extends LeafExpression with Unevaluable { override def dataType: DataType = StringType override def nullable: Boolean = false - override def prettyName: String = "current_database" +
[spark] branch master updated: [SPARK-45505][PYTHON] Refactor analyzeInPython to make it reusable
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 280f6b33110d [SPARK-45505][PYTHON] Refactor analyzeInPython to make it reusable 280f6b33110d is described below commit 280f6b33110d707ebee6fec6e5bafa45b45213ae Author: allisonwang-db AuthorDate: Thu Oct 12 17:02:41 2023 -0700 [SPARK-45505][PYTHON] Refactor analyzeInPython to make it reusable ### What changes were proposed in this pull request? Currently, the `analyzeInPython` method in UserDefinedPythonTableFunction object can starts a Python process in driver and run a Python function in the Python process. This PR aims to refactor this logic into a reusable runner class. ### Why are the changes needed? To make the code more reusable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43340 from allisonwang-db/spark-45505-refactor-analyze-in-py. Authored-by: allisonwang-db Signed-off-by: Takuya UESHIN --- python/pyspark/sql/worker/analyze_udtf.py | 6 +- .../org/apache/spark/sql/internal/SQLConf.scala| 18 +- .../sql/execution/python/PythonPlannerRunner.scala | 177 .../python/UserDefinedPythonFunction.scala | 321 +++-- 4 files changed, 286 insertions(+), 236 deletions(-) diff --git a/python/pyspark/sql/worker/analyze_udtf.py b/python/pyspark/sql/worker/analyze_udtf.py index a6aa381eb14a..9e84b880fc96 100644 --- a/python/pyspark/sql/worker/analyze_udtf.py +++ b/python/pyspark/sql/worker/analyze_udtf.py @@ -98,14 +98,14 @@ def main(infile: IO, outfile: IO) -> None: """ Runs the Python UDTF's `analyze` static method. -This process will be invoked from `UserDefinedPythonTableFunction.analyzeInPython` in JVM -and receive the Python UDTF and its arguments for the `analyze` static method, +This process will be invoked from `UserDefinedPythonTableFunctionAnalyzeRunner.runInPython` +in JVM and receive the Python UDTF and its arguments for the `analyze` static method, and call the `analyze` static method, and send back a AnalyzeResult as a result of the method. """ try: check_python_version(infile) -memory_limit_mb = int(os.environ.get("PYSPARK_UDTF_ANALYZER_MEMORY_MB", "-1")) +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) setup_memory_limits(memory_limit_mb) setup_spark_files(infile) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 12ec9e911d31..000694f6f1bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3008,14 +3008,14 @@ object SQLConf { .booleanConf .createWithDefault(false) - val PYTHON_TABLE_UDF_ANALYZER_MEMORY = -buildConf("spark.sql.analyzer.pythonUDTF.analyzeInPython.memory") - .doc("The amount of memory to be allocated to PySpark for Python UDTF analyzer, in MiB " + -"unless otherwise specified. If set, PySpark memory for Python UDTF analyzer will be " + -"limited to this amount. If not set, Spark will not limit Python's " + -"memory use and it is up to the application to avoid exceeding the overhead memory space " + -"shared with other non-JVM processes.\nNote: Windows does not support resource limiting " + -"and actual resource is not limited on MacOS.") + val PYTHON_PLANNER_EXEC_MEMORY = +buildConf("spark.sql.planner.pythonExecution.memory") + .doc("Specifies the memory allocation for executing Python code in Spark driver, in MiB. " + +"When set, it caps the memory for Python execution to the specified amount. " + +"If not set, Spark will not limit Python's memory usage and it is up to the application " + +"to avoid exceeding the overhead memory space shared with other non-JVM processes.\n" + +"Note: Windows does not support resource limiting and actual resource is not limited " + +"on MacOS.") .version("4.0.0") .bytesConf(ByteUnit.MiB) .createOptional @@ -5157,7 +5157,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def pysparkWorkerPythonExecutable: Option[String] = getConf(SQLConf.PYSPARK_WORKER_PYTHON_EXECUTABLE) - def pythonUDTFAnalyzerMemory: Option[Long] = getConf(PYTHON_TABLE_UDF_ANALYZER_MEMORY) + def pythonPlannerExecMemory: Option[Long] = getConf(PYTHON_PLANNER_EXEC_MEMORY) def
[spark] branch master updated: [SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e720cce1813e [SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message e720cce1813e is described below commit e720cce1813e384847d4ef0bac48a202b2e39848 Author: Yihong He AuthorDate: Fri Oct 13 08:36:51 2023 +0900 [SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message ### What changes were proposed in this pull request? - Include QueryContext in SparkThrowable proto message - Reconstruct QueryContext for SparkThrowable exceptions on the client side ### Why are the changes needed? - Better integration with the error framework ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"` ### Was this patch authored or co-authored using generative AI tooling? Closes #43352 from heyihong/SPARK-45516. Lead-authored-by: Yihong He Co-authored-by: Yihong He Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 4 ++ .../src/main/protobuf/spark/connect/base.proto | 28 - .../connect/client/GrpcExceptionConverter.scala| 50 .../spark/sql/connect/utils/ErrorUtils.scala | 11 python/pyspark/sql/connect/proto/base_pb2.py | 22 +++ python/pyspark/sql/connect/proto/base_pb2.pyi | 69 +- 6 files changed, 159 insertions(+), 25 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 6e0a04cf4eb4..04d284f2ec23 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -129,6 +129,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM assert(!ex.messageParameters.isEmpty) assert(ex.getSqlState != null) assert(!ex.isInternalError) +assert(ex.getQueryContext.length == 1) +assert(ex.getQueryContext.head.startIndex() == 7) +assert(ex.getQueryContext.head.stopIndex() == 7) +assert(ex.getQueryContext.head.fragment() == "x") assert( ex.getStackTrace .find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis")) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index 5b8858f40d26..273512272225 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -819,13 +819,39 @@ message FetchErrorDetailsResponse { int32 line_number = 4; } + // QueryContext defines the schema for the query context of a SparkThrowable. + // It helps users understand where the error occurs while executing queries. + message QueryContext { +// The object type of the query which throws the exception. +// If the exception is directly from the main query, it should be an empty string. +// Otherwise, it should be the exact object type in upper case. For example, a "VIEW". +string object_type = 1; + +// The object name of the query which throws the exception. +// If the exception is directly from the main query, it should be an empty string. +// Otherwise, it should be the object name. For example, a view name "V1". +string object_name = 2; + +// The starting index in the query text which throws the exception. The index starts from 0. +int32 start_index = 3; + +// The stopping index in the query which throws the exception. The index starts from 0. +int32 stop_index = 4; + +// The corresponding fragment of the query which throws the exception. +string fragment = 5; + } + // SparkThrowable defines the schema for SparkThrowable exceptions. message SparkThrowable { // Succinct, human-readable, unique, and consistent representation of the error category. optional string error_class = 1; -// message parameters for the error framework. +// The message parameters for the error framework. map message_parameters = 2; + +// The query context of a SparkThrowable. +repeated QueryContext query_contexts = 3; } // Error defines the schema for the representing exception. diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
Re: [PR] Add canonical links to the PySpark docs page for published docs [spark-website]
allisonwang-db commented on PR #482: URL: https://github.com/apache/spark-website/pull/482#issuecomment-1760125845 @panbingkun thanks for doing this. However, I discovered that some of the canonical links generated are not a valid URL, for example: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html Is there a way to update this canonical link to the actual latest documentation for groupBy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45499][CORE][TESTS][FOLLOWUP] Use `ReferenceQueue#remove` instead of `ReferenceQueue#poll`
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7663fdfa3e84 [SPARK-45499][CORE][TESTS][FOLLOWUP] Use `ReferenceQueue#remove` instead of `ReferenceQueue#poll` 7663fdfa3e84 is described below commit 7663fdfa3e84d7231784c39e4d3445e6f2f079fd Author: yangjie01 AuthorDate: Fri Oct 13 01:24:01 2023 +0800 [SPARK-45499][CORE][TESTS][FOLLOWUP] Use `ReferenceQueue#remove` instead of `ReferenceQueue#poll` ### What changes were proposed in this pull request? This pr replaces `refQueue.poll()` with `refQueue.remove()` in the test case `reference to sub iterator should not be available after completion` to ensure that a `PhantomReference` object can be retrieved from `refQueue`. ### Why are the changes needed? https://github.com/apache/spark/pull/43325 replaces `Reference#isEnqueued` with `Reference#refersTo(null)` to eliminate the use of deprecated APIs. However, there are some differences between `ref.isEnqueued` and `ref.refersTo(null)`. - The `ref.isEnqueued` method is used to check whether this `PhantomReference` object has been added to its reference queue by the garbage collector. When the garbage collector decides to recycle an object, if this object has one or more `PhantomReference`, then these `PhantomReference` will be added to their reference queues. So, if `ref.isEnqueued` returns `true`, it means that this `PhantomReference` has been added to the reference queue, which means that the object it references h [...] - The `ref.refersTo(null)` method is used to check whether this `PhantomReference` object refers to the specified object. In the current code, `ref.refersTo(null)` is used to check whether `ref` still refers to `sub`. If `ref.refersTo(null)` returns `true`, it means that `ref` no longer refers to `sub`, which means that `sub` might have been recycled by the garbage collector. But this does not mean that this `ref` has been added to the reference queue. So we can see the following test failure in GA: https://github.com/apache/spark/actions/runs/6484510414/job/17608536854 ``` [info] - reference to sub iterator should not be available after completion *** FAILED *** (287 milliseconds) [info] null did not equal java.lang.ref.PhantomReference11e8f090 (CompletionIteratorSuite.scala:67) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.util.CompletionIteratorSuite.$anonfun$new$3(CompletionIteratorSuite.scala:67) ``` To solve this issue, this PR replaces `refQueue.poll()` with `refQueue.remove()` to allow for waiting until `ref` is put into `refQueue` and can be retrieved from `refQueue`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43345 from LuciferYang/ref-remove. Authored-by: yangjie01 Signed-off-by: yangjie01 --- core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala index 297e4fd53ab4..6153c2c74353 100644 --- a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala @@ -64,6 +64,6 @@ class CompletionIteratorSuite extends SparkFunSuite { } } assert(ref.refersTo(null)) -assert(refQueue.poll() === ref) +assert(refQueue.remove(1000) === ref) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45501][CORE][SQL] Use pattern matching for type checking and conversion
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b0576fff9b72 [SPARK-45501][CORE][SQL] Use pattern matching for type checking and conversion b0576fff9b72 is described below commit b0576fff9b72880cd81a9d22c044dec329bc67d0 Author: yangjie01 AuthorDate: Thu Oct 12 23:53:06 2023 +0800 [SPARK-45501][CORE][SQL] Use pattern matching for type checking and conversion ### What changes were proposed in this pull request? This pr change to use pattern matching for type checking and conversion instead of the explicit type casting statement in Java code. The change refer to [JEP 394](https://openjdk.org/jeps/394), and this pr does not include parts of the `hive-thriftserver` module. Example: ```java if (obj instanceof String) { String str = (String) obj; System.out.println(str); } ``` Can be replaced with ```java if (obj instanceof String str) { System.out.println(str); } ``` ### Why are the changes needed? Using `JEP 394: Pattern Matching for instanceof` can bring the following benefits: 1. **Code conciseness**: By eliminating explicit type conversion and redundant variable declarations, the code becomes more concise and easy to read. 2. **Improved safety**: In the past, explicit type conversion was required, and if accidentally converted to the wrong type, a `ClassCastException` would be thrown at runtime. Now, as type checking and type conversion occur in the same step, such errors are no longer possible. 3. **Better semantics**: Previously, instanceof and type casting were two independent steps, which could lead to unclear code intentions. Now, these two steps are merged into one, making the intentions of the code clearer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43327 from LuciferYang/jep-394. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../spark/util/kvstore/ArrayKeyIndexType.java | 3 +-- .../org/apache/spark/util/kvstore/CustomType1.java | 3 +-- .../org/apache/spark/util/kvstore/CustomType2.java | 3 +-- .../org/apache/spark/util/kvstore/IntKeyType.java | 3 +-- .../spark/network/protocol/ChunkFetchFailure.java | 3 +-- .../spark/network/protocol/ChunkFetchRequest.java | 3 +-- .../spark/network/protocol/ChunkFetchSuccess.java | 3 +-- .../network/protocol/MergedBlockMetaRequest.java | 3 +-- .../spark/network/protocol/MessageEncoder.java | 3 +-- .../spark/network/protocol/OneWayMessage.java | 3 +-- .../apache/spark/network/protocol/RpcFailure.java | 3 +-- .../apache/spark/network/protocol/RpcRequest.java | 3 +-- .../apache/spark/network/protocol/RpcResponse.java | 3 +-- .../spark/network/protocol/StreamChunkId.java | 3 +-- .../spark/network/protocol/StreamFailure.java | 3 +-- .../spark/network/protocol/StreamRequest.java | 3 +-- .../spark/network/protocol/StreamResponse.java | 3 +-- .../spark/network/protocol/UploadStream.java | 3 +-- .../apache/spark/network/sasl/SparkSaslClient.java | 9 +++- .../apache/spark/network/sasl/SparkSaslServer.java | 12 -- .../network/server/TransportChannelHandler.java| 3 +-- .../network/shuffle/ExternalBlockHandler.java | 3 +-- .../network/shuffle/ShuffleTransportContext.java | 6 ++--- .../shuffle/protocol/BlockPushReturnCode.java | 3 +-- .../network/shuffle/protocol/BlocksRemoved.java| 3 +-- .../shuffle/protocol/ExecutorShuffleInfo.java | 3 +-- .../shuffle/protocol/FinalizeShuffleMerge.java | 3 +-- .../shuffle/protocol/GetLocalDirsForExecutors.java | 3 +-- .../shuffle/protocol/LocalDirsForExecutors.java| 3 +-- .../network/shuffle/protocol/MergeStatuses.java| 3 +-- .../spark/network/shuffle/protocol/OpenBlocks.java | 3 +-- .../network/shuffle/protocol/PushBlockStream.java | 3 +-- .../network/shuffle/protocol/RegisterExecutor.java | 3 +-- .../network/shuffle/protocol/RemoveBlocks.java | 3 +-- .../shuffle/protocol/RemoveShuffleMerge.java | 3 +-- .../network/shuffle/protocol/StreamHandle.java | 3 +-- .../network/shuffle/protocol/UploadBlock.java | 3 +-- .../shuffle/protocol/UploadBlockStream.java| 3 +-- .../network/yarn/YarnShuffleServiceMetrics.java| 9 +++- .../org/apache/spark/util/sketch/BitArray.java | 3 +-- .../apache/spark/util/sketch/BloomFilterImpl.java | 11 +++-- .../spark/util/sketch/CountMinSketchImpl.java | 8 ++- .../org/apache/spark/unsafe/types/UTF8String.java | 6 ++---
[spark] branch master updated: [SPARK-45502][BUILD] Upgrade Kafka to 3.6.0
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d1bd21a2a219 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.0 d1bd21a2a219 is described below commit d1bd21a2a219ebe6c5ac3fcb1e17db75af3c670c Author: dengziming AuthorDate: Thu Oct 12 08:47:25 2023 -0700 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.0 ### What changes were proposed in this pull request? Upgrade Apache Kafka from 3.4.1 to 3.6.0 ### Why are the changes needed? - https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html - https://downloads.apache.org/kafka/3.5.1/RELEASE_NOTES.html - https://archive.apache.org/dist/kafka/3.5.0/RELEASE_NOTES.html ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GitHub CI. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43348 from dengziming/kafka-3.6.0. Authored-by: dengziming Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 ++-- .../spark/streaming/kafka010/KafkaRDDSuite.scala | 16 -- .../spark/streaming/kafka010/KafkaTestUtils.scala | 4 ++-- .../streaming/kafka010/mocks/MockScheduler.scala | 25 +++--- pom.xml| 2 +- 5 files changed, 26 insertions(+), 25 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index c54afc6290b1..2b0c13ed443d 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -28,7 +28,6 @@ import scala.io.Source import scala.jdk.CollectionConverters._ import com.google.common.io.Files -import kafka.api.Request import kafka.server.{HostedPartition, KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.zk.KafkaZkClient @@ -40,6 +39,7 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT} import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.SystemTime @@ -597,7 +597,7 @@ class KafkaTestUtils( .getPartitionInfo(topic, partition) match { case Some(partitionState) => zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined && - Request.isValidBrokerId(partitionState.leader) && + FetchRequest.isValidBrokerId(partitionState.leader) && !partitionState.replicas.isEmpty case _ => diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 735ec2f7b448..ae941b1fddd5 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -24,12 +24,14 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.util.Random -import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog} -import kafka.server.{BrokerTopicStats, LogDirFailureChannel} +import kafka.log.{LogCleaner, UnifiedLog} +import kafka.server.BrokerTopicStats import kafka.utils.Pool import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark._ @@ -90,13 +92,13 @@ class KafkaRDDSuite extends SparkFunSuite { val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() -logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) -logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) +logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) +logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,
[spark] branch master updated: [SPARK-45132][SQL] Fix IDENTIFIER for function invocation
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f0b2e6da5211 [SPARK-45132][SQL] Fix IDENTIFIER for function invocation f0b2e6da5211 is described below commit f0b2e6da52113802f64f7879f207064d3bdbc7b0 Author: srielau AuthorDate: Thu Oct 12 21:34:49 2023 +0800 [SPARK-45132][SQL] Fix IDENTIFIER for function invocation ### What changes were proposed in this pull request? Due to a quirk in the parser, in some cases, IDENTIFIER()() is not properly recognized as a function invocation. The change is to remove the explicit IDENTIFIER-clause rule in the function invocation grammar and instead recognize IDENTIFIER() within visitFunctionCall. ### Why are the changes needed? Function invocation support for IDENTIFIER is incomplete otherwise ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added new testcases to identifier-clause.sql ### Was this patch authored or co-authored using generative AI tooling? No Closes #42888 from srielau/SPARK-45132. Lead-authored-by: srielau Co-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 43 -- .../analyzer-results/identifier-clause.sql.out | 28 -- .../sql-tests/inputs/identifier-clause.sql | 3 +- .../sql-tests/results/identifier-clause.sql.out| 27 +- 5 files changed, 77 insertions(+), 26 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 6a6d39e96ca2..77a9108e0632 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -967,7 +967,6 @@ primaryExpression | qualifiedName DOT ASTERISK #star | LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor | LEFT_PAREN query RIGHT_PAREN #subqueryExpression -| IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN #identifierClause | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument (COMMA argument+=functionArgument)*)? RIGHT_PAREN (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? @@ -1196,6 +1195,7 @@ qualifiedNameList functionName : IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN +| identFunc=IDENTIFIER_KW // IDENTIFIER itself is also a valid function name. | qualifiedName | FILTER | LEFT diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c2bc6e9eb65a..9abca8b95cf7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2246,13 +2246,6 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { } } - /** - * Create an expression for the IDENTIFIER() clause. - */ - override def visitIdentifierClause(ctx: IdentifierClauseContext): Expression = withOrigin(ctx) { -ExpressionWithUnresolvedIdentifier(expression(ctx.expression), UnresolvedAttribute(_)) - } - /** * Create a (windowed) Function expression. */ @@ -2274,19 +2267,31 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { val filter = Option(ctx.where).map(expression(_)) val ignoreNulls = Option(ctx.nullsOption).map(_.getType == SqlBaseParser.IGNORE).getOrElse(false) -val funcCtx = ctx.functionName -val func = withFuncIdentClause( - funcCtx, - ident => UnresolvedFunction(ident, arguments, isDistinct, filter, ignoreNulls) -) -// Check if the function is evaluated in a windowed context. -ctx.windowSpec match { - case spec: WindowRefContext => -UnresolvedWindowExpression(func, visitWindowRef(spec)) - case spec: WindowDefContext => -WindowExpression(func, visitWindowDef(spec)) - case _ => func +// Is this an IDENTIFIER clause instead of a function call? +if (ctx.functionName.identFunc != null && + arguments.length == 1 && // One argument + ctx.setQuantifier == null && // No other clause +
[spark] branch branch-3.5 updated: [SPARK-45132][SQL] Fix IDENTIFIER for function invocation
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 249533bcc8c7 [SPARK-45132][SQL] Fix IDENTIFIER for function invocation 249533bcc8c7 is described below commit 249533bcc8c7fa7f578961ce21d4d7118565dfc1 Author: srielau AuthorDate: Thu Oct 12 21:34:49 2023 +0800 [SPARK-45132][SQL] Fix IDENTIFIER for function invocation ### What changes were proposed in this pull request? Due to a quirk in the parser, in some cases, IDENTIFIER()() is not properly recognized as a function invocation. The change is to remove the explicit IDENTIFIER-clause rule in the function invocation grammar and instead recognize IDENTIFIER() within visitFunctionCall. ### Why are the changes needed? Function invocation support for IDENTIFIER is incomplete otherwise ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added new testcases to identifier-clause.sql ### Was this patch authored or co-authored using generative AI tooling? No Closes #42888 from srielau/SPARK-45132. Lead-authored-by: srielau Co-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit f0b2e6da52113802f64f7879f207064d3bdbc7b0) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 43 -- .../analyzer-results/identifier-clause.sql.out | 28 -- .../sql-tests/inputs/identifier-clause.sql | 3 +- .../sql-tests/results/identifier-clause.sql.out| 27 +- 5 files changed, 77 insertions(+), 26 deletions(-) diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 85dbc499fbde..04128216be07 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -951,7 +951,6 @@ primaryExpression | qualifiedName DOT ASTERISK #star | LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN #rowConstructor | LEFT_PAREN query RIGHT_PAREN #subqueryExpression -| IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN #identifierClause | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument (COMMA argument+=functionArgument)*)? RIGHT_PAREN (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? @@ -1176,6 +1175,7 @@ qualifiedNameList functionName : IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN +| identFunc=IDENTIFIER_KW // IDENTIFIER itself is also a valid function name. | qualifiedName | FILTER | LEFT diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 83938632e534..b80ea8fddcfe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2223,13 +2223,6 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { } } - /** - * Create an expression for the IDENTIFIER() clause. - */ - override def visitIdentifierClause(ctx: IdentifierClauseContext): Expression = withOrigin(ctx) { -ExpressionWithUnresolvedIdentifier(expression(ctx.expression), UnresolvedAttribute(_)) - } - /** * Create a (windowed) Function expression. */ @@ -2251,19 +2244,31 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { val filter = Option(ctx.where).map(expression(_)) val ignoreNulls = Option(ctx.nullsOption).map(_.getType == SqlBaseParser.IGNORE).getOrElse(false) -val funcCtx = ctx.functionName -val func = withFuncIdentClause( - funcCtx, - ident => UnresolvedFunction(ident, arguments, isDistinct, filter, ignoreNulls) -) -// Check if the function is evaluated in a windowed context. -ctx.windowSpec match { - case spec: WindowRefContext => -UnresolvedWindowExpression(func, visitWindowRef(spec)) - case spec: WindowDefContext => -WindowExpression(func, visitWindowDef(spec)) - case _ => func +// Is this an IDENTIFIER clause instead of a function call? +if (ctx.functionName.identFunc != null && +
[spark] branch branch-3.4 updated: [SPARK-45433][SQL][3.4] Fix CSV/JSON schema inference when timestamps do not match specified timestampFormat
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new f985d716e164 [SPARK-45433][SQL][3.4] Fix CSV/JSON schema inference when timestamps do not match specified timestampFormat f985d716e164 is described below commit f985d716e164885575ec7f36a7782694411da024 Author: Jia Fan AuthorDate: Thu Oct 12 17:09:48 2023 +0500 [SPARK-45433][SQL][3.4] Fix CSV/JSON schema inference when timestamps do not match specified timestampFormat ### What changes were proposed in this pull request? This is a backport PR of #43243. Fix the bug of schema inference when timestamps do not match specified timestampFormat. Please check #43243 for detail. ### Why are the changes needed? Fix schema inference bug on 3.4. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. ### Was this patch authored or co-authored using generative AI tooling? Closes #43343 from Hisoka-X/backport-SPARK-45433-inference-schema. Authored-by: Jia Fan Signed-off-by: Max Gekk --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 8 ++-- .../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala | 7 +-- .../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala| 10 ++ .../apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala | 8 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 51586a0065e9..dd8ac3985f19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -202,8 +203,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // We can only parse the value as TimestampNTZType if it does not have zone-offset or // time-zone component and can be parsed with the timestamp formatter. // Otherwise, it is likely to be a timestamp with timezone. -if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - SQLConf.get.timestampType +val timestampType = SQLConf.get.timestampType +if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY || +timestampType == TimestampNTZType) && +timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { + timestampType } else { tryParseTimestamp(field) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 5385afe8c935..7e4767750fd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -148,11 +149,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } +val timestampType = SQLConf.get.timestampType if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get -} else if (options.inferTimestamp && +} else if (options.inferTimestamp && (SQLConf.get.legacyTimeParserPolicy == + LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) && timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) { - SQLConf.get.timestampType + timestampType } else if (options.inferTimestamp && timestampFormatter.parseOptional(field).isDefined) { TimestampType diff --git
[spark] branch master updated: [SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` with Pandas-on-Spark object on Spark Connect
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 470aaf32a43e [SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` with Pandas-on-Spark object on Spark Connect 470aaf32a43e is described below commit 470aaf32a43e3f778e28050df3b81ffd16cd7ff2 Author: Haejoon Lee AuthorDate: Thu Oct 12 19:56:53 2023 +0900 [SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` with Pandas-on-Spark object on Spark Connect ### What changes were proposed in this pull request? This PR proposes to raise proper exception for `ps.sql` with Pandas-on-Spark DataFrame on Spark Connect ### Why are the changes needed? To improve error message ### Does this PR introduce _any_ user-facing change? No API change, but it's error message improvement. **Before** ```python >>> psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) >>> ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, tbl=psdf) Traceback (most recent call last): ... pyspark.errors.exceptions.connect.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `_pandas_api_32aa6c7b33ac442bab790cfb49f65ca1` cannot be found. Verify the spelling and correctness of the schema and catalog. If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog. To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 17; 'Project ['A, 'B] +- 'UnresolvedRelation [_pandas_api_32aa6c7b33ac442bab790cfb49f65ca1], [], false JVM stacktrace: org.apache.spark.sql.catalyst.ExtendedAnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `_pandas_api_32aa6c7b33ac442bab790cfb49f65ca1` cannot be found. Verify the spelling and correctness of the schema and catalog. ... ``` **After** ```python >>> psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) >>> ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, tbl=psdf) Traceback (most recent call last): ... pyspark.errors.exceptions.base.PySparkTypeError: [UNSUPPORTED_DATA_TYPE] Unsupported DataType `DataFrame`. ``` ### How was this patch tested? The existing CI should pass ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43237 from itholic/SPARK-43664. Lead-authored-by: Haejoon Lee Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/sql_formatter.py | 13 + python/pyspark/pandas/tests/connect/test_parity_sql.py | 4 ++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pandas/sql_formatter.py b/python/pyspark/pandas/sql_formatter.py index 91c4f0b7d77b..9800037016c5 100644 --- a/python/pyspark/pandas/sql_formatter.py +++ b/python/pyspark/pandas/sql_formatter.py @@ -30,6 +30,8 @@ from pyspark.sql import SparkSession from pyspark.pandas.utils import default_session from pyspark.pandas.frame import DataFrame from pyspark.pandas.series import Series +from pyspark.errors import PySparkTypeError +from pyspark.sql.utils import is_remote __all__ = ["sql"] @@ -59,6 +61,9 @@ def sql( Also the method can bind named parameters to SQL literals from `args`. +.. note:: +pandas-on-Spark DataFrame is not supported for Spark Connect. + Parameters -- query : str @@ -198,6 +203,14 @@ def sql( session = default_session() formatter = PandasSQLStringFormatter(session) try: +# ps.DataFrame are not supported for Spark Connect currently. +if is_remote(): +for obj in kwargs.values(): +if isinstance(obj, ps.DataFrame): +raise PySparkTypeError( +error_class="UNSUPPORTED_DATA_TYPE", +message_parameters={"data_type": type(obj).__name__}, +) sdf = session.sql(formatter.format(query, **kwargs), args) finally: formatter.clear() diff --git a/python/pyspark/pandas/tests/connect/test_parity_sql.py b/python/pyspark/pandas/tests/connect/test_parity_sql.py index c042de6b9007..2e503cac07a8 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_sql.py +++ b/python/pyspark/pandas/tests/connect/test_parity_sql.py @@ -22,11 +22,11 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils class SQLParityTests(SQLTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): -@unittest.skip("TODO(SPARK-43664): Fix TABLE_OR_VIEW_NOT_FOUND from SQLParityTests.") +@unittest.skip("Test depends on temp view issue on
[spark] branch master updated: [SPARK-45510][SQL] Replace `scala.collection.generic.Growable` to `scala.collection.mutable.Growable`
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 385a2f5f0475 [SPARK-45510][SQL] Replace `scala.collection.generic.Growable` to `scala.collection.mutable.Growable` 385a2f5f0475 is described below commit 385a2f5f0475ce63180abc7ebb7577e0214ca2fb Author: Jia Fan AuthorDate: Thu Oct 12 16:48:36 2023 +0800 [SPARK-45510][SQL] Replace `scala.collection.generic.Growable` to `scala.collection.mutable.Growable` ### What changes were proposed in this pull request? Since scala 2.13.0, `scala.collection.generic.Growable` marked as deprecated. This PR change it to `scala.collection.mutable.Growable` ### Why are the changes needed? Remove deprecated api. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? exist test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43347 from Hisoka-X/SPARK-45510-replace-growable. Authored-by: Jia Fan Signed-off-by: yangjie01 --- .../org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 7bbc930ceab5..d0d4ca659057 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.catalyst.expressions.aggregate -import scala.collection.generic.Growable import scala.collection.mutable +import scala.collection.mutable.Growable import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45488][SQL] XML: Add support for value in 'rowTag' element
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e69752aa66c0 [SPARK-45488][SQL] XML: Add support for value in 'rowTag' element e69752aa66c0 is described below commit e69752aa66c09df843adaebe86a63cf799961292 Author: Shujing Yang AuthorDate: Thu Oct 12 16:17:37 2023 +0900 [SPARK-45488][SQL] XML: Add support for value in 'rowTag' element ### What changes were proposed in this pull request? The following XML with rowTag 'book' will yield a schema with just "_id" column and not the value: ``` Great Book ``` Let's parse value as well. The scope of this PR is to keep the rowTag's behavior of `valueTag` consistent with the inner objects. ### Why are the changes needed? The semantics for attributes and `valueTag` should be consistent ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43319 from shujingyang-db/rootlevel-valuetag. Lead-authored-by: Shujing Yang Co-authored-by: Shujing Yang <135740748+shujingyang...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon --- .../spark/sql/catalyst/xml/StaxXmlParser.scala | 17 - .../spark/sql/catalyst/xml/XmlInferSchema.scala| 16 + .../xml-resources/root-level-value-none.xml| 8 +++ .../test-data/xml-resources/root-level-value.xml | 9 +++ .../sql/execution/datasources/xml/XmlSuite.scala | 75 ++ 5 files changed, 123 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index ac29e234e5f9..dcb760aca9d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -103,7 +103,12 @@ class StaxXmlParser( } val parser = StaxXmlParserUtils.filteredReader(xml) val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser) - Some(convertObject(parser, schema, options, rootAttributes)) + // A structure object is an attribute-only element + // if it only consists of attributes and valueTags. + val isRootAttributesOnly = schema.fields.forall { f => +f.name == options.valueTag || f.name.startsWith(options.attributePrefix) + } + Some(convertObject(parser, schema, options, rootAttributes, isRootAttributesOnly)) } catch { case e: SparkUpgradeException => throw e case e@(_: RuntimeException | _: XMLStreamException | _: MalformedInputException @@ -305,7 +310,8 @@ class StaxXmlParser( parser: XMLEventReader, schema: StructType, options: XmlOptions, - rootAttributes: Array[Attribute] = Array.empty): InternalRow = { + rootAttributes: Array[Attribute] = Array.empty, + isRootAttributesOnly: Boolean = false): InternalRow = { val row = new Array[Any](schema.length) val nameToIndex = schema.map(_.name).zipWithIndex.toMap // If there are attributes, then we process them first. @@ -371,6 +377,13 @@ class StaxXmlParser( badRecordException = badRecordException.orElse(Some(e)) } +case c: Characters if !c.isWhiteSpace && isRootAttributesOnly => + nameToIndex.get(options.valueTag) match { +case Some(index) => + row(index) = convertTo(c.getData, schema(index).dataType, options) +case None => // do nothing + } + case _: EndElement => shouldStop = StaxXmlParserUtils.checkEndElement(parser) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala index 3eabf4525b4e..8bddb8f5bd99 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala @@ -219,12 +219,28 @@ private[sql] object XmlInferSchema { dataTypes += inferredType nameToDataType += (field -> dataTypes) +case c: Characters if !c.isWhiteSpace => + // This can be an attribute-only object + val valueTagType = inferFrom(c.getData, options) + nameToDataType += options.valueTag -> ArrayBuffer(valueTagType) + case _: EndElement => shouldStop = StaxXmlParserUtils.checkEndElement(parser) case _ => // do nothing } } +// A structure object is an